From ffe40726a9196af03c7ff611fbed9109450130c5 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 25 Oct 2024 09:22:14 +0200 Subject: [PATCH] Extend StreamsBootstrap model (#534) Closes #520 --- docs/docs/schema/defaults.json | 980 +++++++++++++++++- docs/docs/schema/pipeline.json | 788 +++++++++++++- .../kafka_connect/connect_wrapper.py | 2 +- kpops/components/common/kubernetes_model.py | 111 ++ kpops/components/streams_bootstrap/model.py | 192 ++++ .../streams_bootstrap/producer/model.py | 53 +- .../streams_bootstrap/streams/model.py | 101 +- poetry.lock | 17 +- pyproject.toml | 1 + .../streams_bootstrap/test_producer_app.py | 29 + .../streams_bootstrap/test_streams_app.py | 14 + .../test_streams_bootstrap.py | 6 +- .../resources/streams-bootstrap/pipeline.yaml | 3 + .../test_streams_bootstrap/pipeline.yaml | 170 +++ .../test_streams_bootstrap/manifest.yaml | 79 +- 15 files changed, 2491 insertions(+), 55 deletions(-) create mode 100644 kpops/components/common/kubernetes_model.py diff --git a/docs/docs/schema/defaults.json b/docs/docs/schema/defaults.json index 96d716f24..63f3782a7 100644 --- a/docs/docs/schema/defaults.json +++ b/docs/docs/schema/defaults.json @@ -1,5 +1,14 @@ { "$defs": { + "Effects": { + "enum": [ + "NoExecute", + "NoSchedule", + "PreferNoSchedule" + ], + "title": "Effects", + "type": "string" + }, "FromSection": { "additionalProperties": false, "description": "Holds multiple input topics.", @@ -209,6 +218,16 @@ "title": "HelmRepoConfig", "type": "object" }, + "ImagePullPolicy": { + "description": "Represents the different Kubernetes image pull policies.\n\nhttps://kubernetes.io/docs/concepts/containers/images/#image-pull-policy", + "enum": [ + "Always", + "IfNotPresent", + "Never" + ], + "title": "ImagePullPolicy", + "type": "string" + }, "InputTopicTypes": { "description": "Input topic types.\n\n- INPUT: input topic\n- PATTERN: extra-topic-pattern or input-topic-pattern", "enum": [ @@ -218,6 +237,51 @@ "title": "InputTopicTypes", "type": "string" }, + "JMXConfig": { + "description": "JMX configuration options.", + "properties": { + "metricRules": { + "default": [ + ".*" + ], + "description": "List of JMX metric rules.", + "items": { + "type": "string" + }, + "title": "Metricrules", + "type": "array" + }, + "port": { + "default": 5555, + "description": "The jmx port which JMX style metrics are exposed.", + "title": "Port", + "type": "integer" + } + }, + "title": "JMXConfig", + "type": "object" + }, + "JavaOptions": { + "description": "JVM configuration options.", + "properties": { + "maxRAMPercentage": { + "default": 75, + "description": "Sets the maximum amount of memory that the JVM may use for the Java heap before applying ergonomics heuristics as a percentage of the maximum amount determined as described in the -XX:MaxRAM option", + "title": "Maxrampercentage", + "type": "integer" + }, + "others": { + "description": "List of Java VM options passed to the streams app.", + "items": { + "type": "string" + }, + "title": "Others", + "type": "array" + } + }, + "title": "JavaOptions", + "type": "object" + }, "KafkaApp": { "additionalProperties": true, "description": "Base component for Kafka-based components.", @@ -737,6 +801,14 @@ "title": "KubernetesAppValues", "type": "object" }, + "Operation": { + "enum": [ + "Exists", + "Equal" + ], + "title": "Operation", + "type": "string" + }, "OutputTopicTypes": { "description": "Types of output topic.\n\n- OUTPUT: output topic\n- ERROR: error topic", "enum": [ @@ -832,6 +904,56 @@ "title": "PipelineComponent", "type": "object" }, + "PortConfig": { + "description": "Base class for the port configuration of the Kafka Streams application.", + "properties": { + "containerPort": { + "description": "", + "title": "Containerport", + "type": "integer" + }, + "name": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Services can reference port by name (optional).", + "title": "Name" + }, + "schema": { + "allOf": [ + { + "$ref": "#/$defs/ProtocolSchema" + } + ], + "default": "TCP", + "description": "Protocol for port. Must be UDP, TCP, or SCTP." + }, + "servicePort": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Number of the port of the service (optional)", + "title": "Serviceport" + } + }, + "required": [ + "containerPort" + ], + "title": "PortConfig", + "type": "object" + }, "ProducerApp": { "additionalProperties": true, "description": "Producer component.\nThis producer holds configuration to use as values for the streams-bootstrap producer Helm chart. Note that the producer does not support error topics.", @@ -1054,6 +1176,94 @@ "additionalProperties": true, "description": "Settings specific to producers.", "properties": { + "affinity": { + "description": "Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity.", + "title": "Affinity", + "type": "object" + }, + "backoffLimit": { + "default": 6, + "description": "The number of times to restart an unsuccessful job.", + "title": "Backofflimit", + "type": "integer" + }, + "commandLine": { + "additionalProperties": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "boolean" + }, + { + "type": "integer" + } + ] + }, + "description": "Map of command line arguments passed to the streams app.", + "title": "Commandline", + "type": "object" + }, + "configurationEnvPrefix": { + "default": "APP", + "description": "Prefix for environment variables to use that should be parsed as command line arguments.", + "title": "Configurationenvprefix", + "type": "string" + }, + "deployment": { + "default": false, + "description": "Deploy the producer as a Kubernetes Deployment (thereby ignoring Job-related configurations)", + "title": "Deployment", + "type": "boolean" + }, + "env": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom environment variables.", + "title": "Env", + "type": "object" + }, + "failedJobsHistoryLimit": { + "default": 1, + "description": "The number of unsuccessful jobs to retain.", + "title": "Failedjobshistorylimit", + "type": "integer" + }, + "files": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file).", + "title": "Files", + "type": "object" + }, + "image": { + "description": "Docker image of the Kafka producer app.", + "title": "Image", + "type": "string" + }, + "imagePullPolicy": { + "allOf": [ + { + "$ref": "#/$defs/ImagePullPolicy" + } + ], + "default": "Always", + "description": "Docker image pull policy." + }, + "imagePullSecrets": { + "description": "", + "items": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "title": "Imagepullsecrets", + "type": "array" + }, "imageTag": { "default": "latest", "description": "Docker image tag of the streams-bootstrap app.", @@ -1061,6 +1271,14 @@ "title": "Imagetag", "type": "string" }, + "javaOptions": { + "allOf": [ + { + "$ref": "#/$defs/JavaOptions" + } + ], + "description": "" + }, "kafka": { "allOf": [ { @@ -1069,6 +1287,11 @@ ], "description": "Kafka Streams settings" }, + "livenessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Livenessprobe", + "type": "object" + }, "nameOverride": { "anyOf": [ { @@ -1082,9 +1305,134 @@ "default": null, "description": "Helm chart name override, assigned automatically", "title": "Nameoverride" + }, + "podAnnotations": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom annotations to attach to the pod spec.", + "title": "Podannotations", + "type": "object" + }, + "podLabels": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom labels to attach to the pod spec.", + "title": "Podlabels", + "type": "object" + }, + "ports": { + "description": "", + "items": { + "$ref": "#/$defs/PortConfig" + }, + "title": "Ports", + "type": "array" + }, + "readinessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Readinessprobe", + "type": "object" + }, + "resources": { + "allOf": [ + { + "$ref": "#/$defs/Resources" + } + ], + "default": { + "limits": { + "cpu": "300m", + "memory": "2G" + }, + "requests": { + "cpu": "100m", + "memory": "500Mi" + } + }, + "description": "See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/" + }, + "restartPolicy": { + "allOf": [ + { + "$ref": "#/$defs/RestartPolicy" + } + ], + "default": "OnFailure", + "description": "See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy" + }, + "schedule": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Cron expression to denote a schedule this producer app should be run on. It will then be deployed as a CronJob instead of a Job.", + "title": "Schedule" + }, + "secretFilesRefs": { + "description": "Mount existing secrets as volumes", + "items": { + "type": "string" + }, + "title": "Secretfilesrefs", + "type": "array" + }, + "secretRefs": { + "description": "Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key.", + "title": "Secretrefs", + "type": "object" + }, + "secrets": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration.", + "title": "Secrets", + "type": "object" + }, + "service": { + "allOf": [ + { + "$ref": "#/$defs/ServiceConfig" + } + ], + "description": "" + }, + "successfulJobsHistoryLimit": { + "default": 1, + "description": "The number of successful jobs to retain.", + "title": "Successfuljobshistorylimit", + "type": "integer" + }, + "suspend": { + "default": false, + "description": "Whether to suspend the execution of the cron job.", + "title": "Suspend", + "type": "boolean" + }, + "tolerations": { + "description": "Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling.", + "items": { + "$ref": "#/$defs/Toleration" + }, + "title": "Tolerations", + "type": "array" + }, + "ttlSecondsAfterFinished": { + "default": 100, + "description": "See https://kubernetes.io/docs/concepts/workloads/controllers/ttlafterfinished/#ttl-after-finished-controller", + "title": "Ttlsecondsafterfinished", + "type": "integer" } }, "required": [ + "image", "kafka" ], "title": "ProducerAppValues", @@ -1190,6 +1538,89 @@ "title": "ProducerStreamsConfig", "type": "object" }, + "PrometheusExporterConfig": { + "description": "Prometheus JMX exporter configuration.", + "properties": { + "jmx": { + "allOf": [ + { + "$ref": "#/$defs/PrometheusJMXExporterConfig" + } + ], + "description": "The prometheus JMX exporter configuration." + } + }, + "title": "PrometheusExporterConfig", + "type": "object" + }, + "PrometheusJMXExporterConfig": { + "description": "Prometheus JMX exporter configuration.", + "properties": { + "enabled": { + "default": true, + "description": "Whether to install Prometheus JMX Exporter as a sidecar container and expose JMX metrics to Prometheus.", + "title": "Enabled", + "type": "boolean" + }, + "image": { + "default": "solsson/kafka-prometheus-jmx-exporter@sha256", + "description": "Docker Image for Prometheus JMX Exporter container.", + "title": "Image", + "type": "string" + }, + "imagePullPolicy": { + "allOf": [ + { + "$ref": "#/$defs/ImagePullPolicy" + } + ], + "default": "IfNotPresent", + "description": "Docker Image Pull Policy for Prometheus JMX Exporter container." + }, + "imageTag": { + "default": "6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143", + "description": "Docker Image Tag for Prometheus JMX Exporter container.", + "title": "Imagetag", + "type": "string" + }, + "port": { + "default": 5556, + "description": "JMX Exporter Port which exposes metrics in Prometheus format for scraping.", + "title": "Port", + "type": "integer" + }, + "resources": { + "allOf": [ + { + "$ref": "#/$defs/Resources" + } + ], + "default": { + "limits": { + "cpu": "300m", + "memory": "2G" + }, + "requests": { + "cpu": "100m", + "memory": "500Mi" + } + }, + "description": "JMX Exporter resources configuration." + } + }, + "title": "PrometheusJMXExporterConfig", + "type": "object" + }, + "ProtocolSchema": { + "description": "Represents the different Kubernetes protocols.\n\nhttps://kubernetes.io/docs/reference/networking/service-protocols/", + "enum": [ + "TCP", + "UDP", + "SCTP" + ], + "title": "ProtocolSchema", + "type": "string" + }, "RepoAuthFlags": { "description": "Authorisation-related flags for `helm repo`.", "properties": { @@ -1257,26 +1688,132 @@ "title": "RepoAuthFlags", "type": "object" }, - "StreamsApp": { - "additionalProperties": true, - "description": "StreamsApp component that configures a streams-bootstrap app.", + "ResourceDefinition": { + "description": "Model representing the 'limits' or `request` section of Kubernetes resource specifications.", "properties": { - "from": { + "cpu": { "anyOf": [ { - "$ref": "#/$defs/FromSection" + "type": "string" }, { - "type": "null" + "type": "integer" } ], - "default": null, - "description": "Topic(s) and/or components from which the component will read input", - "title": "From" + "description": "The maximum amount of CPU a container can use, expressed in milli CPUs (e.g., '300m').", + "title": "Cpu" }, - "name": { - "description": "Component name", - "title": "Name", + "memory": { + "description": "The maximum amount of memory a container can use, with valid units such as 'Mi' or 'Gi' (e.g., '2G').", + "pattern": "^\\d+([EPTGMk]|Ei|Pi|Ti|Gi|Mi|Ki)?$", + "title": "Memory", + "type": "string" + } + }, + "required": [ + "cpu", + "memory" + ], + "title": "ResourceDefinition", + "type": "object" + }, + "Resources": { + "description": "Model representing the resource specifications for a Kubernetes container.", + "properties": { + "limits": { + "allOf": [ + { + "$ref": "#/$defs/ResourceDefinition" + } + ], + "description": "The maximum resource limits for the container." + }, + "requests": { + "allOf": [ + { + "$ref": "#/$defs/ResourceDefinition" + } + ], + "description": "The minimum resource requirements for the container." + } + }, + "required": [ + "requests", + "limits" + ], + "title": "Resources", + "type": "object" + }, + "RestartPolicy": { + "enum": [ + "Always", + "OnFailure", + "Never" + ], + "title": "RestartPolicy", + "type": "string" + }, + "ServiceConfig": { + "description": "Base model for configuring a service for the Kafka Streams application.", + "properties": { + "enabled": { + "default": false, + "description": "Whether to create a service.", + "title": "Enabled", + "type": "boolean" + }, + "labels": { + "additionalProperties": { + "type": "string" + }, + "description": "Additional service labels.", + "title": "Labels", + "type": "object" + }, + "type": { + "allOf": [ + { + "$ref": "#/$defs/ServiceType" + } + ], + "default": "ClusterIP", + "description": "Service type." + } + }, + "title": "ServiceConfig", + "type": "object" + }, + "ServiceType": { + "description": "Represents the different Kubernetes service types.\n\nhttps://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types", + "enum": [ + "ClusterIP", + "NodePort", + "LoadBalancer", + "ExternalName" + ], + "title": "ServiceType", + "type": "string" + }, + "StreamsApp": { + "additionalProperties": true, + "description": "StreamsApp component that configures a streams-bootstrap app.", + "properties": { + "from": { + "anyOf": [ + { + "$ref": "#/$defs/FromSection" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Topic(s) and/or components from which the component will read input", + "title": "From" + }, + "name": { + "description": "Component name", + "title": "Name", "type": "string" }, "namespace": { @@ -1524,6 +2061,11 @@ "additionalProperties": true, "description": "streams-bootstrap app configurations.\nThe attributes correspond to keys and values that are used as values for the streams bootstrap helm chart.", "properties": { + "affinity": { + "description": "Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity.", + "title": "Affinity", + "type": "object" + }, "autoscaling": { "anyOf": [ { @@ -1536,6 +2078,71 @@ "default": null, "description": "Kubernetes event-driven autoscaling config" }, + "commandLine": { + "additionalProperties": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "boolean" + }, + { + "type": "integer" + } + ] + }, + "description": "Map of command line arguments passed to the streams app.", + "title": "Commandline", + "type": "object" + }, + "configurationEnvPrefix": { + "default": "APP", + "description": "Prefix for environment variables to use that should be parsed as command line arguments.", + "title": "Configurationenvprefix", + "type": "string" + }, + "env": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom environment variables.", + "title": "Env", + "type": "object" + }, + "files": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file).", + "title": "Files", + "type": "object" + }, + "image": { + "description": "Docker image of the Kafka producer app.", + "title": "Image", + "type": "string" + }, + "imagePullPolicy": { + "allOf": [ + { + "$ref": "#/$defs/ImagePullPolicy" + } + ], + "default": "Always", + "description": "Docker image pull policy." + }, + "imagePullSecrets": { + "description": "", + "items": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "title": "Imagepullsecrets", + "type": "array" + }, "imageTag": { "default": "latest", "description": "Docker image tag of the streams-bootstrap app.", @@ -1543,6 +2150,22 @@ "title": "Imagetag", "type": "string" }, + "javaOptions": { + "allOf": [ + { + "$ref": "#/$defs/JavaOptions" + } + ], + "description": "" + }, + "jmx": { + "allOf": [ + { + "$ref": "#/$defs/JMXConfig" + } + ], + "description": "Configuration for JMX Exporter." + }, "kafka": { "allOf": [ { @@ -1551,6 +2174,11 @@ ], "description": "streams-bootstrap kafka section" }, + "livenessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Livenessprobe", + "type": "object" + }, "nameOverride": { "anyOf": [ { @@ -1576,16 +2204,115 @@ "size": null, "storage_class": null }, + "description": "Configuration for persistent volume to store the state of the streams app." + }, + "podAnnotations": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom annotations to attach to the pod spec.", + "title": "Podannotations", + "type": "object" + }, + "podLabels": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom labels to attach to the pod spec.", + "title": "Podlabels", + "type": "object" + }, + "ports": { + "description": "", + "items": { + "$ref": "#/$defs/PortConfig" + }, + "title": "Ports", + "type": "array" + }, + "prometheus": { + "allOf": [ + { + "$ref": "#/$defs/PrometheusExporterConfig" + } + ], + "description": "Configuration for Prometheus JMX Exporter." + }, + "readinessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Readinessprobe", + "type": "object" + }, + "resources": { + "allOf": [ + { + "$ref": "#/$defs/Resources" + } + ], + "default": { + "limits": { + "cpu": "300m", + "memory": "2G" + }, + "requests": { + "cpu": "100m", + "memory": "500Mi" + } + }, + "description": "See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/" + }, + "secretFilesRefs": { + "description": "Mount existing secrets as volumes", + "items": { + "type": "string" + }, + "title": "Secretfilesrefs", + "type": "array" + }, + "secretRefs": { + "description": "Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key.", + "title": "Secretrefs", + "type": "object" + }, + "secrets": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration.", + "title": "Secrets", + "type": "object" + }, + "service": { + "allOf": [ + { + "$ref": "#/$defs/ServiceConfig" + } + ], "description": "" }, "statefulSet": { "default": false, - "description": "Whether to use a Statefulset instead of a Deployment to deploy the streams app.", + "description": "Whether to use a StatefulSet instead of a Deployment to deploy the streams app.", "title": "Statefulset", "type": "boolean" + }, + "terminationGracePeriodSeconds": { + "default": 300, + "description": "Delay for graceful application shutdown in seconds: https://pracucci.com/graceful-shutdown-of-kubernetes-pods.html", + "title": "Terminationgraceperiodseconds", + "type": "integer" + }, + "tolerations": { + "description": "Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling.", + "items": { + "$ref": "#/$defs/Toleration" + }, + "title": "Tolerations", + "type": "array" } }, "required": [ + "image", "kafka" ], "title": "StreamsAppValues", @@ -1817,6 +2544,76 @@ "additionalProperties": true, "description": "Base value class for all streams bootstrap related components.", "properties": { + "affinity": { + "description": "Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity.", + "title": "Affinity", + "type": "object" + }, + "commandLine": { + "additionalProperties": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "boolean" + }, + { + "type": "integer" + } + ] + }, + "description": "Map of command line arguments passed to the streams app.", + "title": "Commandline", + "type": "object" + }, + "configurationEnvPrefix": { + "default": "APP", + "description": "Prefix for environment variables to use that should be parsed as command line arguments.", + "title": "Configurationenvprefix", + "type": "string" + }, + "env": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom environment variables.", + "title": "Env", + "type": "object" + }, + "files": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file).", + "title": "Files", + "type": "object" + }, + "image": { + "description": "Docker image of the Kafka producer app.", + "title": "Image", + "type": "string" + }, + "imagePullPolicy": { + "allOf": [ + { + "$ref": "#/$defs/ImagePullPolicy" + } + ], + "default": "Always", + "description": "Docker image pull policy." + }, + "imagePullSecrets": { + "description": "", + "items": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "title": "Imagepullsecrets", + "type": "array" + }, "imageTag": { "default": "latest", "description": "Docker image tag of the streams-bootstrap app.", @@ -1824,6 +2621,14 @@ "title": "Imagetag", "type": "string" }, + "javaOptions": { + "allOf": [ + { + "$ref": "#/$defs/JavaOptions" + } + ], + "description": "" + }, "kafka": { "allOf": [ { @@ -1832,6 +2637,11 @@ ], "description": "Kafka configuration for the streams-bootstrap app." }, + "livenessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Livenessprobe", + "type": "object" + }, "nameOverride": { "anyOf": [ { @@ -1845,9 +2655,94 @@ "default": null, "description": "Helm chart name override, assigned automatically", "title": "Nameoverride" + }, + "podAnnotations": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom annotations to attach to the pod spec.", + "title": "Podannotations", + "type": "object" + }, + "podLabels": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom labels to attach to the pod spec.", + "title": "Podlabels", + "type": "object" + }, + "ports": { + "description": "", + "items": { + "$ref": "#/$defs/PortConfig" + }, + "title": "Ports", + "type": "array" + }, + "readinessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Readinessprobe", + "type": "object" + }, + "resources": { + "allOf": [ + { + "$ref": "#/$defs/Resources" + } + ], + "default": { + "limits": { + "cpu": "300m", + "memory": "2G" + }, + "requests": { + "cpu": "100m", + "memory": "500Mi" + } + }, + "description": "See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/" + }, + "secretFilesRefs": { + "description": "Mount existing secrets as volumes", + "items": { + "type": "string" + }, + "title": "Secretfilesrefs", + "type": "array" + }, + "secretRefs": { + "description": "Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key.", + "title": "Secretrefs", + "type": "object" + }, + "secrets": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration.", + "title": "Secrets", + "type": "object" + }, + "service": { + "allOf": [ + { + "$ref": "#/$defs/ServiceConfig" + } + ], + "description": "" + }, + "tolerations": { + "description": "Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling.", + "items": { + "$ref": "#/$defs/Toleration" + }, + "title": "Tolerations", + "type": "array" } }, "required": [ + "image", "kafka" ], "title": "StreamsBootstrapValues", @@ -1879,6 +2774,65 @@ "title": "ToSection", "type": "object" }, + "Toleration": { + "description": "Represents the different Kubernetes tolerations.\nhttps://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/", + "properties": { + "effect": { + "allOf": [ + { + "$ref": "#/$defs/Effects" + } + ], + "description": "The effect to tolerate." + }, + "key": { + "description": "The key that the toleration applies to.", + "title": "Key", + "type": "string" + }, + "operator": { + "allOf": [ + { + "$ref": "#/$defs/Operation" + } + ], + "default": "Equal", + "description": "The operator ('Exists' or 'Equal')." + }, + "toleration_seconds": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "The duration for which the toleration is valid.", + "title": "Toleration Seconds" + }, + "value": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "The value to match for the key.", + "title": "Value" + } + }, + "required": [ + "key", + "effect" + ], + "title": "Toleration", + "type": "object" + }, "TopicConfig": { "additionalProperties": false, "description": "Configure an output topic.", diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 29bcc0dc6..df0ec7869 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -1,5 +1,14 @@ { "$defs": { + "Effects": { + "enum": [ + "NoExecute", + "NoSchedule", + "PreferNoSchedule" + ], + "title": "Effects", + "type": "string" + }, "FromSection": { "additionalProperties": false, "description": "Holds multiple input topics.", @@ -209,6 +218,16 @@ "title": "HelmRepoConfig", "type": "object" }, + "ImagePullPolicy": { + "description": "Represents the different Kubernetes image pull policies.\n\nhttps://kubernetes.io/docs/concepts/containers/images/#image-pull-policy", + "enum": [ + "Always", + "IfNotPresent", + "Never" + ], + "title": "ImagePullPolicy", + "type": "string" + }, "InputTopicTypes": { "description": "Input topic types.\n\n- INPUT: input topic\n- PATTERN: extra-topic-pattern or input-topic-pattern", "enum": [ @@ -218,6 +237,51 @@ "title": "InputTopicTypes", "type": "string" }, + "JMXConfig": { + "description": "JMX configuration options.", + "properties": { + "metricRules": { + "default": [ + ".*" + ], + "description": "List of JMX metric rules.", + "items": { + "type": "string" + }, + "title": "Metricrules", + "type": "array" + }, + "port": { + "default": 5555, + "description": "The jmx port which JMX style metrics are exposed.", + "title": "Port", + "type": "integer" + } + }, + "title": "JMXConfig", + "type": "object" + }, + "JavaOptions": { + "description": "JVM configuration options.", + "properties": { + "maxRAMPercentage": { + "default": 75, + "description": "Sets the maximum amount of memory that the JVM may use for the Java heap before applying ergonomics heuristics as a percentage of the maximum amount determined as described in the -XX:MaxRAM option", + "title": "Maxrampercentage", + "type": "integer" + }, + "others": { + "description": "List of Java VM options passed to the streams app.", + "items": { + "type": "string" + }, + "title": "Others", + "type": "array" + } + }, + "title": "JavaOptions", + "type": "object" + }, "KafkaConnectorConfig": { "additionalProperties": true, "additional_properties": { @@ -444,6 +508,14 @@ "title": "KafkaSourceConnector", "type": "object" }, + "Operation": { + "enum": [ + "Exists", + "Equal" + ], + "title": "Operation", + "type": "string" + }, "OutputTopicTypes": { "description": "Types of output topic.\n\n- OUTPUT: output topic\n- ERROR: error topic", "enum": [ @@ -492,6 +564,56 @@ "title": "PersistenceConfig", "type": "object" }, + "PortConfig": { + "description": "Base class for the port configuration of the Kafka Streams application.", + "properties": { + "containerPort": { + "description": "", + "title": "Containerport", + "type": "integer" + }, + "name": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Services can reference port by name (optional).", + "title": "Name" + }, + "schema": { + "allOf": [ + { + "$ref": "#/$defs/ProtocolSchema" + } + ], + "default": "TCP", + "description": "Protocol for port. Must be UDP, TCP, or SCTP." + }, + "servicePort": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Number of the port of the service (optional)", + "title": "Serviceport" + } + }, + "required": [ + "containerPort" + ], + "title": "PortConfig", + "type": "object" + }, "ProducerApp": { "additionalProperties": true, "description": "Producer component.\nThis producer holds configuration to use as values for the streams-bootstrap producer Helm chart. Note that the producer does not support error topics.", @@ -714,6 +836,94 @@ "additionalProperties": true, "description": "Settings specific to producers.", "properties": { + "affinity": { + "description": "Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity.", + "title": "Affinity", + "type": "object" + }, + "backoffLimit": { + "default": 6, + "description": "The number of times to restart an unsuccessful job.", + "title": "Backofflimit", + "type": "integer" + }, + "commandLine": { + "additionalProperties": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "boolean" + }, + { + "type": "integer" + } + ] + }, + "description": "Map of command line arguments passed to the streams app.", + "title": "Commandline", + "type": "object" + }, + "configurationEnvPrefix": { + "default": "APP", + "description": "Prefix for environment variables to use that should be parsed as command line arguments.", + "title": "Configurationenvprefix", + "type": "string" + }, + "deployment": { + "default": false, + "description": "Deploy the producer as a Kubernetes Deployment (thereby ignoring Job-related configurations)", + "title": "Deployment", + "type": "boolean" + }, + "env": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom environment variables.", + "title": "Env", + "type": "object" + }, + "failedJobsHistoryLimit": { + "default": 1, + "description": "The number of unsuccessful jobs to retain.", + "title": "Failedjobshistorylimit", + "type": "integer" + }, + "files": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file).", + "title": "Files", + "type": "object" + }, + "image": { + "description": "Docker image of the Kafka producer app.", + "title": "Image", + "type": "string" + }, + "imagePullPolicy": { + "allOf": [ + { + "$ref": "#/$defs/ImagePullPolicy" + } + ], + "default": "Always", + "description": "Docker image pull policy." + }, + "imagePullSecrets": { + "description": "", + "items": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "title": "Imagepullsecrets", + "type": "array" + }, "imageTag": { "default": "latest", "description": "Docker image tag of the streams-bootstrap app.", @@ -721,6 +931,14 @@ "title": "Imagetag", "type": "string" }, + "javaOptions": { + "allOf": [ + { + "$ref": "#/$defs/JavaOptions" + } + ], + "description": "" + }, "kafka": { "allOf": [ { @@ -729,6 +947,11 @@ ], "description": "Kafka Streams settings" }, + "livenessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Livenessprobe", + "type": "object" + }, "nameOverride": { "anyOf": [ { @@ -742,9 +965,134 @@ "default": null, "description": "Helm chart name override, assigned automatically", "title": "Nameoverride" + }, + "podAnnotations": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom annotations to attach to the pod spec.", + "title": "Podannotations", + "type": "object" + }, + "podLabels": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom labels to attach to the pod spec.", + "title": "Podlabels", + "type": "object" + }, + "ports": { + "description": "", + "items": { + "$ref": "#/$defs/PortConfig" + }, + "title": "Ports", + "type": "array" + }, + "readinessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Readinessprobe", + "type": "object" + }, + "resources": { + "allOf": [ + { + "$ref": "#/$defs/Resources" + } + ], + "default": { + "limits": { + "cpu": "300m", + "memory": "2G" + }, + "requests": { + "cpu": "100m", + "memory": "500Mi" + } + }, + "description": "See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/" + }, + "restartPolicy": { + "allOf": [ + { + "$ref": "#/$defs/RestartPolicy" + } + ], + "default": "OnFailure", + "description": "See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy" + }, + "schedule": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Cron expression to denote a schedule this producer app should be run on. It will then be deployed as a CronJob instead of a Job.", + "title": "Schedule" + }, + "secretFilesRefs": { + "description": "Mount existing secrets as volumes", + "items": { + "type": "string" + }, + "title": "Secretfilesrefs", + "type": "array" + }, + "secretRefs": { + "description": "Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key.", + "title": "Secretrefs", + "type": "object" + }, + "secrets": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration.", + "title": "Secrets", + "type": "object" + }, + "service": { + "allOf": [ + { + "$ref": "#/$defs/ServiceConfig" + } + ], + "description": "" + }, + "successfulJobsHistoryLimit": { + "default": 1, + "description": "The number of successful jobs to retain.", + "title": "Successfuljobshistorylimit", + "type": "integer" + }, + "suspend": { + "default": false, + "description": "Whether to suspend the execution of the cron job.", + "title": "Suspend", + "type": "boolean" + }, + "tolerations": { + "description": "Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling.", + "items": { + "$ref": "#/$defs/Toleration" + }, + "title": "Tolerations", + "type": "array" + }, + "ttlSecondsAfterFinished": { + "default": 100, + "description": "See https://kubernetes.io/docs/concepts/workloads/controllers/ttlafterfinished/#ttl-after-finished-controller", + "title": "Ttlsecondsafterfinished", + "type": "integer" } }, "required": [ + "image", "kafka" ], "title": "ProducerAppValues", @@ -850,6 +1198,89 @@ "title": "ProducerStreamsConfig", "type": "object" }, + "PrometheusExporterConfig": { + "description": "Prometheus JMX exporter configuration.", + "properties": { + "jmx": { + "allOf": [ + { + "$ref": "#/$defs/PrometheusJMXExporterConfig" + } + ], + "description": "The prometheus JMX exporter configuration." + } + }, + "title": "PrometheusExporterConfig", + "type": "object" + }, + "PrometheusJMXExporterConfig": { + "description": "Prometheus JMX exporter configuration.", + "properties": { + "enabled": { + "default": true, + "description": "Whether to install Prometheus JMX Exporter as a sidecar container and expose JMX metrics to Prometheus.", + "title": "Enabled", + "type": "boolean" + }, + "image": { + "default": "solsson/kafka-prometheus-jmx-exporter@sha256", + "description": "Docker Image for Prometheus JMX Exporter container.", + "title": "Image", + "type": "string" + }, + "imagePullPolicy": { + "allOf": [ + { + "$ref": "#/$defs/ImagePullPolicy" + } + ], + "default": "IfNotPresent", + "description": "Docker Image Pull Policy for Prometheus JMX Exporter container." + }, + "imageTag": { + "default": "6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143", + "description": "Docker Image Tag for Prometheus JMX Exporter container.", + "title": "Imagetag", + "type": "string" + }, + "port": { + "default": 5556, + "description": "JMX Exporter Port which exposes metrics in Prometheus format for scraping.", + "title": "Port", + "type": "integer" + }, + "resources": { + "allOf": [ + { + "$ref": "#/$defs/Resources" + } + ], + "default": { + "limits": { + "cpu": "300m", + "memory": "2G" + }, + "requests": { + "cpu": "100m", + "memory": "500Mi" + } + }, + "description": "JMX Exporter resources configuration." + } + }, + "title": "PrometheusJMXExporterConfig", + "type": "object" + }, + "ProtocolSchema": { + "description": "Represents the different Kubernetes protocols.\n\nhttps://kubernetes.io/docs/reference/networking/service-protocols/", + "enum": [ + "TCP", + "UDP", + "SCTP" + ], + "title": "ProtocolSchema", + "type": "string" + }, "RepoAuthFlags": { "description": "Authorisation-related flags for `helm repo`.", "properties": { @@ -917,6 +1348,112 @@ "title": "RepoAuthFlags", "type": "object" }, + "ResourceDefinition": { + "description": "Model representing the 'limits' or `request` section of Kubernetes resource specifications.", + "properties": { + "cpu": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "integer" + } + ], + "description": "The maximum amount of CPU a container can use, expressed in milli CPUs (e.g., '300m').", + "title": "Cpu" + }, + "memory": { + "description": "The maximum amount of memory a container can use, with valid units such as 'Mi' or 'Gi' (e.g., '2G').", + "pattern": "^\\d+([EPTGMk]|Ei|Pi|Ti|Gi|Mi|Ki)?$", + "title": "Memory", + "type": "string" + } + }, + "required": [ + "cpu", + "memory" + ], + "title": "ResourceDefinition", + "type": "object" + }, + "Resources": { + "description": "Model representing the resource specifications for a Kubernetes container.", + "properties": { + "limits": { + "allOf": [ + { + "$ref": "#/$defs/ResourceDefinition" + } + ], + "description": "The maximum resource limits for the container." + }, + "requests": { + "allOf": [ + { + "$ref": "#/$defs/ResourceDefinition" + } + ], + "description": "The minimum resource requirements for the container." + } + }, + "required": [ + "requests", + "limits" + ], + "title": "Resources", + "type": "object" + }, + "RestartPolicy": { + "enum": [ + "Always", + "OnFailure", + "Never" + ], + "title": "RestartPolicy", + "type": "string" + }, + "ServiceConfig": { + "description": "Base model for configuring a service for the Kafka Streams application.", + "properties": { + "enabled": { + "default": false, + "description": "Whether to create a service.", + "title": "Enabled", + "type": "boolean" + }, + "labels": { + "additionalProperties": { + "type": "string" + }, + "description": "Additional service labels.", + "title": "Labels", + "type": "object" + }, + "type": { + "allOf": [ + { + "$ref": "#/$defs/ServiceType" + } + ], + "default": "ClusterIP", + "description": "Service type." + } + }, + "title": "ServiceConfig", + "type": "object" + }, + "ServiceType": { + "description": "Represents the different Kubernetes service types.\n\nhttps://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types", + "enum": [ + "ClusterIP", + "NodePort", + "LoadBalancer", + "ExternalName" + ], + "title": "ServiceType", + "type": "string" + }, "StreamsApp": { "additionalProperties": true, "description": "StreamsApp component that configures a streams-bootstrap app.", @@ -1184,6 +1721,11 @@ "additionalProperties": true, "description": "streams-bootstrap app configurations.\nThe attributes correspond to keys and values that are used as values for the streams bootstrap helm chart.", "properties": { + "affinity": { + "description": "Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity.", + "title": "Affinity", + "type": "object" + }, "autoscaling": { "anyOf": [ { @@ -1196,6 +1738,71 @@ "default": null, "description": "Kubernetes event-driven autoscaling config" }, + "commandLine": { + "additionalProperties": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "boolean" + }, + { + "type": "integer" + } + ] + }, + "description": "Map of command line arguments passed to the streams app.", + "title": "Commandline", + "type": "object" + }, + "configurationEnvPrefix": { + "default": "APP", + "description": "Prefix for environment variables to use that should be parsed as command line arguments.", + "title": "Configurationenvprefix", + "type": "string" + }, + "env": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom environment variables.", + "title": "Env", + "type": "object" + }, + "files": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file).", + "title": "Files", + "type": "object" + }, + "image": { + "description": "Docker image of the Kafka producer app.", + "title": "Image", + "type": "string" + }, + "imagePullPolicy": { + "allOf": [ + { + "$ref": "#/$defs/ImagePullPolicy" + } + ], + "default": "Always", + "description": "Docker image pull policy." + }, + "imagePullSecrets": { + "description": "", + "items": { + "additionalProperties": { + "type": "string" + }, + "type": "object" + }, + "title": "Imagepullsecrets", + "type": "array" + }, "imageTag": { "default": "latest", "description": "Docker image tag of the streams-bootstrap app.", @@ -1203,6 +1810,22 @@ "title": "Imagetag", "type": "string" }, + "javaOptions": { + "allOf": [ + { + "$ref": "#/$defs/JavaOptions" + } + ], + "description": "" + }, + "jmx": { + "allOf": [ + { + "$ref": "#/$defs/JMXConfig" + } + ], + "description": "Configuration for JMX Exporter." + }, "kafka": { "allOf": [ { @@ -1211,6 +1834,11 @@ ], "description": "streams-bootstrap kafka section" }, + "livenessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Livenessprobe", + "type": "object" + }, "nameOverride": { "anyOf": [ { @@ -1236,16 +1864,115 @@ "size": null, "storage_class": null }, + "description": "Configuration for persistent volume to store the state of the streams app." + }, + "podAnnotations": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom annotations to attach to the pod spec.", + "title": "Podannotations", + "type": "object" + }, + "podLabels": { + "additionalProperties": { + "type": "string" + }, + "description": "Map of custom labels to attach to the pod spec.", + "title": "Podlabels", + "type": "object" + }, + "ports": { + "description": "", + "items": { + "$ref": "#/$defs/PortConfig" + }, + "title": "Ports", + "type": "array" + }, + "prometheus": { + "allOf": [ + { + "$ref": "#/$defs/PrometheusExporterConfig" + } + ], + "description": "Configuration for Prometheus JMX Exporter." + }, + "readinessProbe": { + "description": "See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core", + "title": "Readinessprobe", + "type": "object" + }, + "resources": { + "allOf": [ + { + "$ref": "#/$defs/Resources" + } + ], + "default": { + "limits": { + "cpu": "300m", + "memory": "2G" + }, + "requests": { + "cpu": "100m", + "memory": "500Mi" + } + }, + "description": "See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/" + }, + "secretFilesRefs": { + "description": "Mount existing secrets as volumes", + "items": { + "type": "string" + }, + "title": "Secretfilesrefs", + "type": "array" + }, + "secretRefs": { + "description": "Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key.", + "title": "Secretrefs", + "type": "object" + }, + "secrets": { + "additionalProperties": { + "type": "string" + }, + "description": "Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration.", + "title": "Secrets", + "type": "object" + }, + "service": { + "allOf": [ + { + "$ref": "#/$defs/ServiceConfig" + } + ], "description": "" }, "statefulSet": { "default": false, - "description": "Whether to use a Statefulset instead of a Deployment to deploy the streams app.", + "description": "Whether to use a StatefulSet instead of a Deployment to deploy the streams app.", "title": "Statefulset", "type": "boolean" + }, + "terminationGracePeriodSeconds": { + "default": 300, + "description": "Delay for graceful application shutdown in seconds: https://pracucci.com/graceful-shutdown-of-kubernetes-pods.html", + "title": "Terminationgraceperiodseconds", + "type": "integer" + }, + "tolerations": { + "description": "Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling.", + "items": { + "$ref": "#/$defs/Toleration" + }, + "title": "Tolerations", + "type": "array" } }, "required": [ + "image", "kafka" ], "title": "StreamsAppValues", @@ -1277,6 +2004,65 @@ "title": "ToSection", "type": "object" }, + "Toleration": { + "description": "Represents the different Kubernetes tolerations.\nhttps://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/", + "properties": { + "effect": { + "allOf": [ + { + "$ref": "#/$defs/Effects" + } + ], + "description": "The effect to tolerate." + }, + "key": { + "description": "The key that the toleration applies to.", + "title": "Key", + "type": "string" + }, + "operator": { + "allOf": [ + { + "$ref": "#/$defs/Operation" + } + ], + "default": "Equal", + "description": "The operator ('Exists' or 'Equal')." + }, + "toleration_seconds": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "The duration for which the toleration is valid.", + "title": "Toleration Seconds" + }, + "value": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "description": "The value to match for the key.", + "title": "Value" + } + }, + "required": [ + "key", + "effect" + ], + "title": "Toleration", + "type": "object" + }, "TopicConfig": { "additionalProperties": false, "description": "Configure an output topic.", diff --git a/kpops/component_handlers/kafka_connect/connect_wrapper.py b/kpops/component_handlers/kafka_connect/connect_wrapper.py index 6e966941a..5744c892c 100644 --- a/kpops/component_handlers/kafka_connect/connect_wrapper.py +++ b/kpops/component_handlers/kafka_connect/connect_wrapper.py @@ -138,7 +138,7 @@ async def validate_connector_config( """Validate connector config using the given configuration. :param connector_config: Configuration parameters for the connector. - :raises KafkaConnectError: Kafka Konnect error + :raises KafkaConnectError: Kafka Connect error :return: List of all found errors """ response = await self._client.put( diff --git a/kpops/components/common/kubernetes_model.py b/kpops/components/common/kubernetes_model.py new file mode 100644 index 000000000..b16dc8b92 --- /dev/null +++ b/kpops/components/common/kubernetes_model.py @@ -0,0 +1,111 @@ +import enum + +from pydantic import Field + +from kpops.utils.docstring import describe_attr +from kpops.utils.pydantic import DescConfigModel + +# Matches plain integer or numbers with valid suffixes: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory +MEMORY_PATTERN = r"^\d+([EPTGMk]|Ei|Pi|Ti|Gi|Mi|Ki)?$" + + +class ServiceType(enum.Enum): + """Represents the different Kubernetes service types. + + https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types + """ + + CLUSTER_IP = "ClusterIP" + NODE_PORT = "NodePort" + LOAD_BALANCER = "LoadBalancer" + EXTERNAL_NAME = "ExternalName" + + +class ProtocolSchema(enum.Enum): + """Represents the different Kubernetes protocols. + + https://kubernetes.io/docs/reference/networking/service-protocols/ + """ + + TCP = "TCP" + UDP = "UDP" + SCTP = "SCTP" + + +class ImagePullPolicy(enum.Enum): + """Represents the different Kubernetes image pull policies. + + https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy + """ + + ALWAYS = "Always" + IF_NOT_PRESENT = "IfNotPresent" + NEVER = "Never" + + +class Operation(enum.Enum): + EXISTS = "Exists" + EQUAL = "Equal" + + +class Effects(enum.Enum): + NO_EXECUTE = "NoExecute" + NO_SCHEDULE = "NoSchedule" + PREFER_NO_SCHEDULE = "PreferNoSchedule" + + +class RestartPolicy(enum.Enum): + ALWAYS = "Always" + ON_FAILURE = "OnFailure" + NEVER = "Never" + + +class Toleration(DescConfigModel): + """Represents the different Kubernetes tolerations. + + https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/ + + :param key: The key that the toleration applies to. + :param operator: The operator ('Exists' or 'Equal'). + :param value: The value to match for the key. + :param effect: The effect to tolerate. + :param toleration_seconds: The duration for which the toleration is valid. + """ + + key: str = Field(default=..., description=describe_attr("key", __doc__)) + + operator: Operation = Field( + default=Operation.EQUAL, description=describe_attr("operator", __doc__) + ) + + effect: Effects = Field(default=..., description=describe_attr("effect", __doc__)) + + value: str | None = Field(default=None, description=describe_attr("value", __doc__)) + + toleration_seconds: int | None = Field( + default=None, description=describe_attr("toleration_seconds", __doc__) + ) + + +class ResourceDefinition(DescConfigModel): + """Model representing the 'limits' or `request` section of Kubernetes resource specifications. + + :param cpu: The maximum amount of CPU a container can use, expressed in milli CPUs (e.g., '300m'). + :param memory: The maximum amount of memory a container can use, with valid units such as 'Mi' or 'Gi' (e.g., '2G'). + """ + + cpu: str | int = Field(pattern=r"^\d+m$", description=describe_attr("cpu", __doc__)) + memory: str = Field( + pattern=MEMORY_PATTERN, description=describe_attr("memory", __doc__) + ) + + +class Resources(DescConfigModel): + """Model representing the resource specifications for a Kubernetes container. + + :param requests: The minimum resource requirements for the container. + :param limits: The maximum resource limits for the container. + """ + + requests: ResourceDefinition = Field(description=describe_attr("requests", __doc__)) + limits: ResourceDefinition = Field(description=describe_attr("limits", __doc__)) diff --git a/kpops/components/streams_bootstrap/model.py b/kpops/components/streams_bootstrap/model.py index 7c4390c02..2340cf8c3 100644 --- a/kpops/components/streams_bootstrap/model.py +++ b/kpops/components/streams_bootstrap/model.py @@ -6,6 +6,14 @@ from pydantic import AliasChoices, ConfigDict, Field from kpops.components.base_components.helm_app import HelmAppValues +from kpops.components.common.kubernetes_model import ( + ImagePullPolicy, + ProtocolSchema, + ResourceDefinition, + Resources, + ServiceType, + Toleration, +) from kpops.components.common.topic import KafkaTopic, KafkaTopicStr from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( @@ -19,23 +27,207 @@ IMAGE_TAG_PATTERN = r"^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$" +class PortConfig(CamelCaseConfigModel, DescConfigModel): + """Base class for the port configuration of the Kafka Streams application. + + :param container_port: Number of the port to expose. + :param name: Services can reference port by name (optional). + :param schema: Protocol for port. Must be UDP, TCP, or SCTP. + :param service_port: Number of the port of the service (optional) + """ + + container_port: int = Field( + description=describe_attr("ports", __doc__), + ) + name: str | None = Field( + default=None, + description=describe_attr("name", __doc__), + ) + schema: ProtocolSchema = Field( + default=ProtocolSchema.TCP, + description=describe_attr("schema", __doc__), + ) + service_port: int | None = Field( + default=None, + description=describe_attr("service_port", __doc__), + ) + + +class ServiceConfig(CamelCaseConfigModel, DescConfigModel): + """Base model for configuring a service for the Kafka Streams application. + + :param enabled: Whether to create a service. + :param labels: Additional service labels. + :param type: Service type. + """ + + enabled: bool = Field( + default=False, + description=describe_attr("enabled", __doc__), + ) + labels: dict[str, str] = Field( + default_factory=dict, + description=describe_attr("labels", __doc__), + ) + type: ServiceType = Field( + default=ServiceType.CLUSTER_IP, + description=describe_attr("type", __doc__), + ) + + +class JavaOptions(CamelCaseConfigModel, DescConfigModel): + """JVM configuration options. + + :param max_RAM_percentage: Sets the maximum amount of memory that the JVM may use for the Java heap before applying ergonomics heuristics as a percentage of the maximum amount determined as described in the -XX:MaxRAM option + :param others: List of Java VM options passed to the streams app. + """ + + max_RAM_percentage: int = Field( + default=75, + description=describe_attr("max_RAM_percentage", __doc__), + ) + others: list[str] = Field( + default_factory=list, + description=describe_attr("others", __doc__), + ) + + class StreamsBootstrapValues(HelmAppValues): """Base value class for all streams bootstrap related components. + :param image: Docker image of the Kafka producer app. :param image_tag: Docker image tag of the streams-bootstrap app. + :param image_pull_policy: Docker image pull policy. + :param image_pull_secrets: Secrets to be used for private registries. :param kafka: Kafka configuration for the streams-bootstrap app. + :param resources: See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + :param configuration_env_prefix: Prefix for environment variables to use that should be parsed as command line arguments. + :param command_line: Map of command line arguments passed to the streams app. + :param env: Custom environment variables. + :param secrets: Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration. + :param secret_refs: Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key. + :param secret_files_refs: Mount existing secrets as volumes + :param files: Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file). + :param pod_annotations: Map of custom annotations to attach to the pod spec. + :param pod_labels: Map of custom labels to attach to the pod spec. + :param liveness_probe: See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core + :param readiness_probe: See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core + :param affinity: Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity. + :param tolerations: Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling. """ + image: str = Field( + description=describe_attr("image", __doc__), + ) + image_tag: str = Field( default="latest", pattern=IMAGE_TAG_PATTERN, description=describe_attr("image_tag", __doc__), ) + image_pull_policy: ImagePullPolicy = Field( + default=ImagePullPolicy.ALWAYS, + description=describe_attr("image_pull_policy", __doc__), + ) + + image_pull_secrets: list[dict[str, str]] = Field( + default_factory=list, + description=describe_attr("image_pull_secret", __doc__), + ) + kafka: KafkaConfig = Field( description=describe_attr("kafka", __doc__), ) + resources: Resources = Field( + default=Resources( + requests=ResourceDefinition(cpu="100m", memory="500Mi"), + limits=ResourceDefinition(cpu="300m", memory="2G"), + ), + description=describe_attr("resources", __doc__), + ) + + ports: list[PortConfig] = Field( + default_factory=list, + description=describe_attr("ports", __doc__), + ) + + service: ServiceConfig = Field( + default_factory=ServiceConfig, + description=describe_attr("service", __doc__), + ) + + configuration_env_prefix: str = Field( + default="APP", + description=describe_attr("configuration_env_prefix", __doc__), + ) + + command_line: dict[str, str | bool | int] = Field( + default_factory=dict, + description=describe_attr("command_line", __doc__), + ) + + env: dict[str, str] = Field( + default_factory=dict, + description=describe_attr("env", __doc__), + ) + + secrets: dict[str, str] = Field( + default_factory=dict, + description=describe_attr("secrets", __doc__), + ) + + secret_refs: dict[str, Any] = Field( + default_factory=dict, + description=describe_attr("secret_refs", __doc__), + ) + + secret_files_refs: list[str] = Field( + default_factory=list, + description=describe_attr("secret_files_refs", __doc__), + ) + + files: dict[str, str] = Field( + default_factory=dict, + description=describe_attr("files", __doc__), + ) + + java_options: JavaOptions = Field( + default_factory=JavaOptions, + description=describe_attr("java_options", __doc__), + ) + + pod_annotations: dict[str, str] = Field( + default_factory=dict, + description=describe_attr("pod_annotations", __doc__), + ) + + pod_labels: dict[str, str] = Field( + default_factory=dict, + description=describe_attr("pod_labels", __doc__), + ) + + liveness_probe: dict[str, Any] = Field( + default_factory=dict, + description=describe_attr("liveness_probe", __doc__), + ) + + readiness_probe: dict[str, Any] = Field( + default_factory=dict, + description=describe_attr("readiness_probe", __doc__), + ) + + affinity: dict[str, Any] = Field( + default_factory=dict, + description=describe_attr("affinity", __doc__), + ) + + tolerations: list[Toleration] = Field( + default_factory=list, + description=describe_attr("tolerations", __doc__), + ) + class KafkaConfig(CamelCaseConfigModel, DescConfigModel): """Kafka Streams config. diff --git a/kpops/components/streams_bootstrap/producer/model.py b/kpops/components/streams_bootstrap/producer/model.py index acf1019c4..ef1b48eb5 100644 --- a/kpops/components/streams_bootstrap/producer/model.py +++ b/kpops/components/streams_bootstrap/producer/model.py @@ -1,5 +1,8 @@ -from pydantic import ConfigDict, Field +from croniter import croniter +from pydantic import ConfigDict, Field, field_validator +from kpops.api.exception import ValidationError +from kpops.components.common.kubernetes_model import RestartPolicy from kpops.components.streams_bootstrap.model import ( KafkaConfig, StreamsBootstrapValues, @@ -15,8 +18,56 @@ class ProducerAppValues(StreamsBootstrapValues): """Settings specific to producers. :param kafka: Kafka Streams settings + :param deployment: Deploy the producer as a Kubernetes Deployment (thereby ignoring Job-related configurations) + :param restart_policy: See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#restart-policy + :param schedule: Cron expression to denote a schedule this producer app should be run on. It will then be deployed as a CronJob instead of a Job. + :param suspend: Whether to suspend the execution of the cron job. + :param successful_jobs_history_limit: The number of successful jobs to retain. + :param failed_jobs_history_limit: The number of unsuccessful jobs to retain. + :param backoff_limit: The number of times to restart an unsuccessful job. + :param ttl_seconds_after_finished: See https://kubernetes.io/docs/concepts/workloads/controllers/ttlafterfinished/#ttl-after-finished-controller """ kafka: ProducerConfig = Field(description=describe_attr("kafka", __doc__)) + deployment: bool = Field( + default=False, description=describe_attr("deployment", __doc__) + ) + + restart_policy: RestartPolicy = Field( + default=RestartPolicy.ON_FAILURE, + description=describe_attr("restart_policy", __doc__), + ) + + schedule: str | None = Field( + default=None, description=describe_attr("schedule", __doc__) + ) + + suspend: bool = Field(default=False, description=describe_attr("suspend", __doc__)) + + successful_jobs_history_limit: int = Field( + default=1, description=describe_attr("successful_jobs_history_limit", __doc__) + ) + + failed_jobs_history_limit: int = Field( + default=1, description=describe_attr("failed_jobs_history_limit", __doc__) + ) + + backoff_limit: int = Field( + default=6, description=describe_attr("backoff_limit", __doc__) + ) + + ttl_seconds_after_finished: int = Field( + default=100, description=describe_attr("ttl_seconds_after_finished", __doc__) + ) + model_config = ConfigDict(extra="allow") + + @field_validator("schedule") + @classmethod + def schedule_cron_validator(cls, schedule: str) -> str: + """Ensure that the defined schedule value is valid.""" + if schedule and not croniter.is_valid(schedule): + msg = f"The schedule field '{schedule}' must be a valid cron expression." + raise ValidationError(msg) + return schedule diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 36ac57f5b..88cbbbab9 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -5,6 +5,11 @@ import pydantic from pydantic import BaseModel, ConfigDict, Field +from kpops.components.common.kubernetes_model import ( + ImagePullPolicy, + ResourceDefinition, + Resources, +) from kpops.components.common.topic import KafkaTopic, KafkaTopicStr from kpops.components.streams_bootstrap.model import ( KafkaConfig, @@ -224,6 +229,76 @@ class PersistenceConfig(BaseModel): ) +class PrometheusExporterConfig(CamelCaseConfigModel, DescConfigModel): + """Prometheus JMX exporter configuration. + + :param jmx: The prometheus JMX exporter configuration. + + """ + + class PrometheusJMXExporterConfig(CamelCaseConfigModel, DescConfigModel): + """Prometheus JMX exporter configuration. + + :param enabled: Whether to install Prometheus JMX Exporter as a sidecar container and expose JMX metrics to Prometheus. + :param image: Docker Image for Prometheus JMX Exporter container. + :param image_tag: Docker Image Tag for Prometheus JMX Exporter container. + :param image_pull_policy: Docker Image Pull Policy for Prometheus JMX Exporter container. + :param port: JMX Exporter Port which exposes metrics in Prometheus format for scraping. + :param resources: JMX Exporter resources configuration. + """ + + enabled: bool = Field( + default=True, + description=describe_attr("enabled", __doc__), + ) + image: str = Field( + default="solsson/kafka-prometheus-jmx-exporter@sha256", + description=describe_attr("image", __doc__), + ) + image_tag: str = Field( + default="6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143", + description=describe_attr("image_tag", __doc__), + ) + image_pull_policy: ImagePullPolicy = Field( + default=ImagePullPolicy.IF_NOT_PRESENT, + description=describe_attr("image_pull_policy", __doc__), + ) + port: int = Field( + default=5556, + description=describe_attr("port", __doc__), + ) + resources: Resources = Field( + default=Resources( + requests=ResourceDefinition(cpu="100m", memory="500Mi"), + limits=ResourceDefinition(cpu="300m", memory="2G"), + ), + description=describe_attr("resources", __doc__), + ) + + jmx: PrometheusJMXExporterConfig = Field( + default_factory=PrometheusJMXExporterConfig, + description=describe_attr("jmx", __doc__), + ) + + +class JMXConfig(CamelCaseConfigModel, DescConfigModel): + """JMX configuration options. + + :param port: The jmx port which JMX style metrics are exposed. + :param metric_rules: List of JMX metric rules. + """ + + port: int = Field( + default=5555, + description=describe_attr("port", __doc__), + ) + + metric_rules: list[str] = Field( + default=[".*"], + description=describe_attr("metric_rules", __doc__), + ) + + class StreamsAppValues(StreamsBootstrapValues): """streams-bootstrap app configurations. @@ -231,21 +306,45 @@ class StreamsAppValues(StreamsBootstrapValues): :param kafka: streams-bootstrap kafka section :param autoscaling: Kubernetes event-driven autoscaling config, defaults to None + :param stateful_set: Whether to use a StatefulSet instead of a Deployment to deploy the streams app. + :param persistence: Configuration for persistent volume to store the state of the streams app. + :param prometheus: Configuration for Prometheus JMX Exporter. + :param jmx: Configuration for JMX Exporter. + :param termination_grace_period_seconds: Delay for graceful application shutdown in seconds: https://pracucci.com/graceful-shutdown-of-kubernetes-pods.html """ kafka: StreamsConfig = Field( description=describe_attr("kafka", __doc__), ) + autoscaling: StreamsAppAutoScaling | None = Field( default=None, description=describe_attr("autoscaling", __doc__), ) + stateful_set: bool = Field( default=False, - description="Whether to use a Statefulset instead of a Deployment to deploy the streams app.", + description=describe_attr("stateful_set", __doc__), ) + persistence: PersistenceConfig = Field( default=PersistenceConfig(), description=describe_attr("persistence", __doc__), ) + + prometheus: PrometheusExporterConfig = Field( + default_factory=PrometheusExporterConfig, + description=describe_attr("prometheus", __doc__), + ) + + jmx: JMXConfig = Field( + default_factory=JMXConfig, + description=describe_attr("jmx", __doc__), + ) + + termination_grace_period_seconds: int = Field( + default=300, + description=describe_attr("termination_grace_period_seconds", __doc__), + ) + model_config = ConfigDict(extra="allow") diff --git a/poetry.lock b/poetry.lock index 78add035b..a773e4d0c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -190,6 +190,21 @@ files = [ [package.extras] test = ["flake8 (==3.7.8)", "hypothesis (==3.55.3)"] +[[package]] +name = "croniter" +version = "3.0.3" +description = "croniter provides iteration for datetime object with cron like format" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.6" +files = [ + {file = "croniter-3.0.3-py2.py3-none-any.whl", hash = "sha256:b3bd11f270dc54ccd1f2397b813436015a86d30ffc5a7a9438eec1ed916f2101"}, + {file = "croniter-3.0.3.tar.gz", hash = "sha256:34117ec1741f10a7bd0ec3ad7d8f0eb8fa457a2feb9be32e6a2250e158957668"}, +] + +[package.dependencies] +python-dateutil = "*" +pytz = ">2021.1" + [[package]] name = "dataproperty" version = "1.0.0" @@ -1937,4 +1952,4 @@ watchmedo = ["PyYAML (>=3.10)"] [metadata] lock-version = "2.0" python-versions = ">=3.10, <3.13" -content-hash = "3b1404726419c634df55ab2da6a4d407e52b0cba9fc8f4e44896e34c46461776" +content-hash = "e307ba934678d208018611cc9567ef9bfa779fc7c024308d2c6fc6d01a32cafe" diff --git a/pyproject.toml b/pyproject.toml index e952719c4..9117d87a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ python-schema-registry-client = "^2.4.1" httpx = "^0 >=0.24.1" networkx = "^3.1" lightkube = "^0.15.3" +croniter = "^3.0.3" [tool.poetry.group.dev.dependencies] pytest = "^8.3.2" diff --git a/tests/components/streams_bootstrap/test_producer_app.py b/tests/components/streams_bootstrap/test_producer_app.py index 046fa7f08..67b5e2929 100644 --- a/tests/components/streams_bootstrap/test_producer_app.py +++ b/tests/components/streams_bootstrap/test_producer_app.py @@ -4,6 +4,7 @@ import pytest from pytest_mock import MockerFixture +from kpops.api.exception import ValidationError from kpops.component_handlers import get_handlers from kpops.component_handlers.helm_wrapper.helm import Helm from kpops.component_handlers.helm_wrapper.model import HelmUpgradeInstallFlags @@ -46,6 +47,7 @@ def producer_app(self) -> ProducerApp: "version": "3.2.1", "namespace": "test-namespace", "values": { + "image": "ProducerApp", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, "clean_schemas": True, @@ -90,6 +92,7 @@ def test_output_topics(self): "namespace": "test-namespace", "values": { "namespace": "test-namespace", + "image": "ProducerApp", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, "to": { @@ -145,6 +148,7 @@ async def test_deploy_order_when_dry_run_is_false( "test-namespace", { "nameOverride": PRODUCER_APP_HELM_NAME_OVERRIDE, + "image": "ProducerApp", "kafka": { "bootstrapServers": "fake-broker:9092", "outputTopic": "producer-app-output-topic", @@ -232,6 +236,7 @@ async def test_should_clean_producer_app( "test-namespace", { "nameOverride": PRODUCER_APP_CLEAN_HELM_NAMEOVERRIDE, + "image": "ProducerApp", "kafka": { "bootstrapServers": "fake-broker:9092", "outputTopic": "producer-app-output-topic", @@ -306,6 +311,7 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea "test-namespace", { "nameOverride": PRODUCER_APP_CLEAN_HELM_NAMEOVERRIDE, + "image": "ProducerApp", "kafka": { "bootstrapServers": "fake-broker:9092", "outputTopic": "producer-app-output-topic", @@ -331,6 +337,7 @@ def test_get_output_topics(self): **{ "namespace": "test-namespace", "values": { + "image": "producer-app", "namespace": "test-namespace", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, @@ -383,6 +390,7 @@ async def test_should_not_deploy_clean_up_when_rest(self, mocker: MockerFixture) **{ "namespace": "test-namespace", "values": { + "image": "registry/producer-app", "imageTag": "2.2.2", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, @@ -433,6 +441,7 @@ async def test_should_deploy_clean_up_job_with_values_in_cluster_when_clean( **{ "namespace": "test-namespace", "values": { + "image": "registry/producer-app", "imageTag": "2.2.2", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, @@ -543,3 +552,23 @@ async def test_clean_should_fall_back_to_local_values_when_validation_of_cluster }, HelmUpgradeInstallFlags(version="3.0.1", wait=True, wait_for_jobs=True), ) + + def test_validate_cron_expression(self): + with pytest.raises(ValidationError): + ProducerApp( + name=PRODUCER_APP_NAME, + **{ + "namespace": "test-namespace", + "values": { + "image": "registry/producer-app", + "imageTag": "2.2.2", + "kafka": {"bootstrapServers": "fake-broker:9092"}, + "schedule": "0 wrong_value 1 * *", # Invalid Cron Expression + }, + "to": { + "topics": { + "test-output-topic": {"type": "output"}, + } + }, + }, + ) diff --git a/tests/components/streams_bootstrap/test_streams_app.py b/tests/components/streams_bootstrap/test_streams_app.py index 9aa982e3c..4412f054f 100644 --- a/tests/components/streams_bootstrap/test_streams_app.py +++ b/tests/components/streams_bootstrap/test_streams_app.py @@ -67,6 +67,7 @@ def streams_app(self) -> StreamsApp: **{ "namespace": "test-namespace", "values": { + "image": "streamsApp", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, "to": { @@ -86,6 +87,7 @@ def stateful_streams_app(self) -> StreamsApp: **{ "namespace": "test-namespace", "values": { + "image": "streamsApp", "statefulSet": True, "persistence": {"enabled": True, "size": "5Gi"}, "kafka": { @@ -144,6 +146,7 @@ def test_set_topics(self): **{ "namespace": "test-namespace", "values": { + "image": "streamsApp", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, "from": { @@ -190,6 +193,7 @@ def test_no_empty_input_topic(self): **{ "namespace": "test-namespace", "values": { + "image": "streamsApp", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, "from": { @@ -262,6 +266,7 @@ def test_set_streams_output_from_to(self): **{ "namespace": "test-namespace", "values": { + "image": "streamsApp", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, "to": { @@ -301,6 +306,7 @@ def test_weave_inputs_from_prev_component(self): **{ "namespace": "test-namespace", "values": { + "image": "streamsApp", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, }, @@ -338,6 +344,7 @@ async def test_deploy_order_when_dry_run_is_false(self, mocker: MockerFixture): **{ "namespace": "test-namespace", "values": { + "image": "streamsApp", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, "to": { @@ -417,6 +424,7 @@ async def test_deploy_order_when_dry_run_is_false(self, mocker: MockerFixture): "test-namespace", { "nameOverride": STREAMS_APP_HELM_NAME_OVERRIDE, + "image": "streamsApp", "kafka": { "bootstrapServers": "fake-broker:9092", "labeledOutputTopics": { @@ -505,6 +513,7 @@ async def test_reset_when_dry_run_is_false( "test-namespace", { "nameOverride": STREAMS_APP_CLEAN_HELM_NAME_OVERRIDE, + "image": "streamsApp", "kafka": { "bootstrapServers": "fake-broker:9092", "outputTopic": "streams-app-output-topic", @@ -571,6 +580,7 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean "test-namespace", { "nameOverride": STREAMS_APP_CLEAN_HELM_NAME_OVERRIDE, + "image": "streamsApp", "kafka": { "bootstrapServers": "fake-broker:9092", "outputTopic": "streams-app-output-topic", @@ -617,6 +627,7 @@ async def test_should_deploy_clean_up_job_with_values_in_cluster_when_reset( **{ "namespace": "test-namespace", "values": { + "image": "registry/streams-app", "imageTag": "2.2.2", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, @@ -696,6 +707,7 @@ async def test_should_deploy_clean_up_job_with_values_in_cluster_when_clean( **{ "namespace": "test-namespace", "values": { + "image": "registry/streams-app", "imageTag": "2.2.2", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, @@ -754,6 +766,7 @@ async def test_get_input_output_topics(self): **{ "namespace": "test-namespace", "values": { + "image": "registry/streams-app", "kafka": {"bootstrapServers": "fake-broker:9092"}, }, "from": { @@ -904,6 +917,7 @@ async def test_stateful_clean_with_dry_run_false( "test-namespace", { "nameOverride": STREAMS_APP_CLEAN_HELM_NAME_OVERRIDE, + "image": "streamsApp", "kafka": { "bootstrapServers": "fake-broker:9092", "outputTopic": "streams-app-output-topic", diff --git a/tests/components/streams_bootstrap/test_streams_bootstrap.py b/tests/components/streams_bootstrap/test_streams_bootstrap.py index a6533d5b6..d2573476b 100644 --- a/tests/components/streams_bootstrap/test_streams_bootstrap.py +++ b/tests/components/streams_bootstrap/test_streams_bootstrap.py @@ -21,9 +21,10 @@ def test_default_configs(self): **{ "namespace": "test-namespace", "values": { + "image": "streamsBootstrap", "kafka": { "bootstrapServers": "localhost:9092", - } + }, }, }, ) @@ -42,6 +43,7 @@ async def test_should_deploy_streams_bootstrap_app(self, mocker: MockerFixture): **{ "namespace": "test-namespace", "values": { + "image": "streamsBootstrap", "imageTag": "1.0.0", "kafka": { "outputTopic": "test", @@ -74,6 +76,7 @@ async def test_should_deploy_streams_bootstrap_app(self, mocker: MockerFixture): "test-namespace", { "nameOverride": "${pipeline.name}-example-name", + "image": "streamsBootstrap", "imageTag": "1.0.0", "kafka": { "bootstrapServers": "fake-broker:9092", @@ -93,6 +96,7 @@ async def test_should_raise_validation_error_for_invalid_image_tag(self): ): StreamsBootstrapValues( **{ + "image": "streamsBootstrap", "imageTag": "invalid image tag!", "kafka": { "bootstrapServers": "fake-broker:9092", diff --git a/tests/pipeline/resources/streams-bootstrap/pipeline.yaml b/tests/pipeline/resources/streams-bootstrap/pipeline.yaml index 54a18341b..ad6308a4a 100644 --- a/tests/pipeline/resources/streams-bootstrap/pipeline.yaml +++ b/tests/pipeline/resources/streams-bootstrap/pipeline.yaml @@ -4,6 +4,7 @@ imageTag: "1.0.0" commandLine: FAKE_ARG: "fake-arg-value" + schedule: "30 3/8 * * *" to: topics: @@ -21,6 +22,8 @@ applicationId: "my-streams-app-id" commandLine: CONVERT_XML: true + javaOptions: + maxRAMPercentage: 85 from: topics: diff --git a/tests/pipeline/snapshots/test_generate/test_streams_bootstrap/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_streams_bootstrap/pipeline.yaml index 049ec8487..49d210491 100644 --- a/tests/pipeline/snapshots/test_generate/test_streams_bootstrap/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_streams_bootstrap/pipeline.yaml @@ -10,16 +10,53 @@ suffix: -clean type: producer-app-cleaner values: + affinity: {} + backoffLimit: 6 commandLine: FAKE_ARG: fake-arg-value + configurationEnvPrefix: APP + deployment: false + env: {} + failedJobsHistoryLimit: 1 + files: {} image: my-registry/my-producer-image + imagePullPolicy: Always + imagePullSecrets: [] imageTag: 1.0.0 + javaOptions: + maxRAMPercentage: 75 + others: [] kafka: bootstrapServers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 labeledOutputTopics: my-producer-app-output-topic-label: my-labeled-producer-app-topic-output outputTopic: my-producer-app-output-topic schemaRegistryUrl: http://localhost:8081/ + livenessProbe: {} + podAnnotations: {} + podLabels: {} + ports: [] + readinessProbe: {} + resources: + limits: + cpu: 300m + memory: 2G + requests: + cpu: 100m + memory: 500Mi + restartPolicy: OnFailure + schedule: 30 3/8 * * * + secretFilesRefs: [] + secretRefs: {} + secrets: {} + service: + enabled: false + labels: {} + type: ClusterIP + successfulJobsHistoryLimit: 1 + suspend: false + tolerations: [] + ttlSecondsAfterFinished: 100 version: 3.0.1 name: my-producer-app namespace: example-namespace @@ -40,16 +77,53 @@ type: output type: my-producer-app values: + affinity: {} + backoffLimit: 6 commandLine: FAKE_ARG: fake-arg-value + configurationEnvPrefix: APP + deployment: false + env: {} + failedJobsHistoryLimit: 1 + files: {} image: my-registry/my-producer-image + imagePullPolicy: Always + imagePullSecrets: [] imageTag: 1.0.0 + javaOptions: + maxRAMPercentage: 75 + others: [] kafka: bootstrapServers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 labeledOutputTopics: my-producer-app-output-topic-label: my-labeled-producer-app-topic-output outputTopic: my-producer-app-output-topic schemaRegistryUrl: http://localhost:8081/ + livenessProbe: {} + podAnnotations: {} + podLabels: {} + ports: [] + readinessProbe: {} + resources: + limits: + cpu: 300m + memory: 2G + requests: + cpu: 100m + memory: 500Mi + restartPolicy: OnFailure + schedule: 30 3/8 * * * + secretFilesRefs: [] + secretRefs: {} + secrets: {} + service: + enabled: false + labels: {} + type: ClusterIP + successfulJobsHistoryLimit: 1 + suspend: false + tolerations: [] + ttlSecondsAfterFinished: 100 version: 3.0.1 - _cleaner: name: my-streams-app @@ -63,10 +137,23 @@ suffix: -clean type: streams-app-cleaner values: + affinity: {} commandLine: CONVERT_XML: true + configurationEnvPrefix: APP + env: {} + files: {} image: my-registry/my-streams-app-image + imagePullPolicy: Always + imagePullSecrets: [] imageTag: 1.0.0 + javaOptions: + maxRAMPercentage: 85 + others: [] + jmx: + metricRules: + - .* + port: 5555 kafka: applicationId: my-streams-app-id bootstrapServers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 @@ -85,9 +172,44 @@ my-output-topic-label: my-labeled-topic-output outputTopic: my-output-topic schemaRegistryUrl: http://localhost:8081/ + livenessProbe: {} persistence: enabled: false + podAnnotations: {} + podLabels: {} + ports: [] + prometheus: + jmx: + enabled: true + image: solsson/kafka-prometheus-jmx-exporter@sha256 + imagePullPolicy: IfNotPresent + imageTag: 6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143 + port: 5556 + resources: + limits: + cpu: 300m + memory: 2G + requests: + cpu: 100m + memory: 500Mi + readinessProbe: {} + resources: + limits: + cpu: 300m + memory: 2G + requests: + cpu: 100m + memory: 500Mi + secretFilesRefs: [] + secretRefs: {} + secrets: {} + service: + enabled: false + labels: {} + type: ClusterIP statefulSet: false + terminationGracePeriodSeconds: 300 + tolerations: [] version: 3.0.1 from: components: {} @@ -129,10 +251,23 @@ value_schema: com.bakdata.kafka.DeadLetter type: my-streams-app values: + affinity: {} commandLine: CONVERT_XML: true + configurationEnvPrefix: APP + env: {} + files: {} image: my-registry/my-streams-app-image + imagePullPolicy: Always + imagePullSecrets: [] imageTag: 1.0.0 + javaOptions: + maxRAMPercentage: 85 + others: [] + jmx: + metricRules: + - .* + port: 5555 kafka: applicationId: my-streams-app-id bootstrapServers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 @@ -151,8 +286,43 @@ my-output-topic-label: my-labeled-topic-output outputTopic: my-output-topic schemaRegistryUrl: http://localhost:8081/ + livenessProbe: {} persistence: enabled: false + podAnnotations: {} + podLabels: {} + ports: [] + prometheus: + jmx: + enabled: true + image: solsson/kafka-prometheus-jmx-exporter@sha256 + imagePullPolicy: IfNotPresent + imageTag: 6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143 + port: 5556 + resources: + limits: + cpu: 300m + memory: 2G + requests: + cpu: 100m + memory: 500Mi + readinessProbe: {} + resources: + limits: + cpu: 300m + memory: 2G + requests: + cpu: 100m + memory: 500Mi + secretFilesRefs: [] + secretRefs: {} + secrets: {} + service: + enabled: false + labels: {} + type: ClusterIP statefulSet: false + terminationGracePeriodSeconds: 300 + tolerations: [] version: 3.0.1 diff --git a/tests/pipeline/snapshots/test_manifest/test_streams_bootstrap/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_streams_bootstrap/manifest.yaml index e685eac55..7df64af03 100644 --- a/tests/pipeline/snapshots/test_manifest/test_streams_bootstrap/manifest.yaml +++ b/tests/pipeline/snapshots/test_manifest/test_streams_bootstrap/manifest.yaml @@ -1,6 +1,6 @@ --- -apiVersion: batch/v1 -kind: Job +apiVersion: batch/v1beta1 +kind: CronJob metadata: labels: app: resources-streams-bootstrap-my-producer-app @@ -8,40 +8,47 @@ metadata: release: resources-streams-bootstrap-my-producer-app name: resources-streams-bootstrap-my-producer-app spec: - backoffLimit: 6 - template: - metadata: - labels: - app: resources-streams-bootstrap-my-producer-app - release: resources-streams-bootstrap-my-producer-app + concurrencyPolicy: Replace + failedJobsHistoryLimit: 1 + jobTemplate: spec: - containers: - - env: - - name: ENV_PREFIX - value: APP_ - - name: APP_BOOTSTRAP_SERVERS - value: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 - - name: APP_SCHEMA_REGISTRY_URL - value: http://localhost:8081/ - - name: APP_OUTPUT_TOPIC - value: my-producer-app-output-topic - - name: APP_LABELED_OUTPUT_TOPICS - value: my-producer-app-output-topic-label=my-labeled-producer-app-topic-output, - - name: APP_FAKE_ARG - value: fake-arg-value - - name: JAVA_TOOL_OPTIONS - value: '-XX:MaxRAMPercentage=75.0 ' - image: my-registry/my-producer-image:1.0.0 - imagePullPolicy: Always - name: resources-streams-bootstrap-my-producer-app - resources: - limits: - cpu: 500m - memory: 2G - requests: - cpu: 200m - memory: 300Mi - restartPolicy: OnFailure + backoffLimit: 6 + template: + metadata: + labels: + app: resources-streams-bootstrap-my-producer-app + release: resources-streams-bootstrap-my-producer-app + spec: + containers: + - env: + - name: ENV_PREFIX + value: APP_ + - name: APP_BOOTSTRAP_SERVERS + value: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + - name: APP_SCHEMA_REGISTRY_URL + value: http://localhost:8081/ + - name: APP_OUTPUT_TOPIC + value: my-producer-app-output-topic + - name: APP_LABELED_OUTPUT_TOPICS + value: my-producer-app-output-topic-label=my-labeled-producer-app-topic-output, + - name: APP_FAKE_ARG + value: fake-arg-value + - name: JAVA_TOOL_OPTIONS + value: '-XX:MaxRAMPercentage=75.0 ' + image: my-registry/my-producer-image:1.0.0 + imagePullPolicy: Always + name: resources-streams-bootstrap-my-producer-app + resources: + limits: + cpu: 500m + memory: 2G + requests: + cpu: 200m + memory: 300Mi + restartPolicy: OnFailure + schedule: 30 3/8 * * * + successfulJobsHistoryLimit: 1 + suspend: false --- apiVersion: v1 @@ -118,7 +125,7 @@ spec: value: 'true' - name: JAVA_TOOL_OPTIONS value: '-Dcom.sun.management.jmxremote.port=5555 -Dcom.sun.management.jmxremote.authenticate=false - -Dcom.sun.management.jmxremote.ssl=false -XX:MaxRAMPercentage=75.0 ' + -Dcom.sun.management.jmxremote.ssl=false -XX:MaxRAMPercentage=85.0 ' image: my-registry/my-streams-app-image:1.0.0 imagePullPolicy: Always name: resources-streams-bootstrap-my-streams-app