diff --git a/kpops/cli/main.py b/kpops/cli/main.py index ff33f1d7c..009e0bb9c 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from collections.abc import Iterator, Mapping +from collections.abc import Iterator from enum import Enum from pathlib import Path from typing import TYPE_CHECKING, Optional @@ -19,6 +19,7 @@ from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper +from kpops.components.base_components.pipeline_component import Resource from kpops.config import ENV_PREFIX, KpopsConfig from kpops.pipeline_generator.pipeline import Pipeline from kpops.utils.gen_schema import SchemaScope, gen_config_schema, gen_pipeline_schema @@ -271,7 +272,7 @@ def render( filter_type: FilterType = FILTER_TYPE, output: bool = OUTPUT_OPTION, verbose: bool = VERBOSE_OPTION, -) -> list[Mapping]: +) -> list[Resource]: pipeline = generate( pipeline_path=pipeline_path, components_module=components_module, @@ -282,13 +283,14 @@ def render( verbose=verbose, ) steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) - manifests: list[Mapping] = [] + resources: list[Resource] = [] for component in steps_to_apply: - manifest = component.render() - manifests.append(manifest) + resource = component.render() + resources.append(resource) if output: - print_yaml(manifest) - return manifests + for manifest in resource: + print_yaml(manifest) + return resources @app.command( diff --git a/kpops/component_handlers/helm_wrapper/helm.py b/kpops/component_handlers/helm_wrapper/helm.py index f41717b3a..b898387e8 100644 --- a/kpops/component_handlers/helm_wrapper/helm.py +++ b/kpops/component_handlers/helm_wrapper/helm.py @@ -20,6 +20,7 @@ Version, ) from kpops.component_handlers.kubernetes.model import KubernetesManifest +from kpops.components.base_components.pipeline_component import Resource if TYPE_CHECKING: from collections.abc import Iterable, Iterator @@ -134,7 +135,7 @@ def template( namespace: str, values: dict, flags: HelmTemplateFlags | None = None, - ) -> KubernetesManifest: + ) -> Resource: """From Helm: Render chart templates locally and display the output. Any values that would normally be looked up or retrieved in-cluster will @@ -146,7 +147,7 @@ def template( :param namespace: The Kubernetes namespace the command should execute in :param values: `values.yaml` to be used :param flags: the flags to be set for `helm template`, defaults to HelmTemplateFlags() - :return: the rendered manifest + :return: the rendered resource (list of Kubernetes manifests) """ if flags is None: flags = HelmTemplateFlags() @@ -164,7 +165,8 @@ def template( ] command.extend(flags.to_command()) output = self.__execute(command) - return KubernetesManifest.from_yaml(output) + manifests = KubernetesManifest.from_yaml(output) + return list(manifests) def get_manifest(self, release_name: str, namespace: str) -> Iterable[HelmTemplate]: command = [ @@ -201,7 +203,10 @@ def load_manifest(yaml_contents: str) -> Iterator[HelmTemplate]: if line.startswith("---"): is_beginning = True if template_name and current_yaml_doc: - manifest = KubernetesManifest.from_yaml("\n".join(current_yaml_doc)) + manifests = KubernetesManifest.from_yaml( + "\n".join(current_yaml_doc) + ) + manifest = next(manifests) # only 1 manifest yield HelmTemplate(Path(template_name), manifest) template_name = None current_yaml_doc.clear() diff --git a/kpops/component_handlers/kubernetes/model.py b/kpops/component_handlers/kubernetes/model.py index 0a112e079..a65d49e79 100644 --- a/kpops/component_handlers/kubernetes/model.py +++ b/kpops/component_handlers/kubernetes/model.py @@ -1,5 +1,6 @@ import json from collections import UserDict +from collections.abc import Iterator from typing import TypeAlias import yaml @@ -16,9 +17,10 @@ class KubernetesManifest(UserDict[str, Json]): """Representation of a Kubernetes API object as YAML/JSON mapping.""" @classmethod - def from_yaml(cls, /, content: str) -> Self: - manifest: dict = yaml.load(content, yaml.Loader) - return cls(manifest) + def from_yaml(cls, /, content: str) -> Iterator[Self]: + manifests: Iterator[dict[str, Json]] = yaml.load_all(content, yaml.Loader) + for manifest in manifests: + yield cls(manifest) @classmethod def from_json(cls, /, content: str) -> Self: diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index 55198ff11..ab1bcaa05 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -16,8 +16,8 @@ HelmTemplateFlags, HelmUpgradeInstallFlags, ) -from kpops.component_handlers.kubernetes.model import KubernetesManifest from kpops.components.base_components.kubernetes_app import KubernetesApp +from kpops.components.base_components.pipeline_component import Resource from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr @@ -96,7 +96,7 @@ def template_flags(self) -> HelmTemplateFlags: ) @override - def render(self) -> KubernetesManifest: + def render(self) -> Resource: return self.helm.template( self.helm_release_name, self.helm_chart, diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index fd29729fb..bd63d1451 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -24,10 +24,12 @@ KafkaConnectResetterConfig, KafkaConnectResetterValues, ) -from kpops.component_handlers.kubernetes.model import KubernetesManifest from kpops.components.base_components.base_defaults_component import deduplicate from kpops.components.base_components.models.from_section import FromTopic -from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.base_components.pipeline_component import ( + PipelineComponent, + Resource, +) from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr @@ -285,7 +287,7 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: raise NotImplementedError(msg) @override - def render(self) -> KubernetesManifest: + def render(self) -> Resource: values = self._get_kafka_connect_resetter_values( offset_topic=self.offset_topic, ) @@ -331,7 +333,7 @@ def add_input_topics(self, topics: list[str]) -> None: setattr(self.app, "topics", ",".join(topics)) @override - def render(self) -> KubernetesManifest: + def render(self) -> Resource: values = self._get_kafka_connect_resetter_values() return self.helm.template( self._resetter_release_name, diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index 732fe6049..5a7901eba 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -2,6 +2,7 @@ from abc import ABC from collections.abc import Mapping +from typing import TypeAlias from pydantic import Extra, Field @@ -21,6 +22,8 @@ from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import DescConfig +Resource: TypeAlias = list[Mapping] # representation of final resource # TODO: move? + class PipelineComponent(BaseDefaultsComponent, ABC): """Base class for all components. @@ -187,9 +190,9 @@ def inflate(self) -> list[PipelineComponent]: """ return [self] - def render(self) -> Mapping: + def render(self) -> Resource: """Render final component resources, e.g. Kubernetes manifest.""" - return {} + return [] def deploy(self, dry_run: bool) -> None: """Deploy component, e.g. to the Kubernetes cluster. diff --git a/tests/component_handlers/kubernetes/model.py b/tests/component_handlers/kubernetes/model.py index fec367b69..334c1f937 100644 --- a/tests/component_handlers/kubernetes/model.py +++ b/tests/component_handlers/kubernetes/model.py @@ -21,15 +21,18 @@ class TestKubernetesManifest: foo: bar """ ), - KubernetesManifest( - { - "apiVersion": "v1", - "kind": "ServiceAccount", - "metadata": {"labels": {"foo": "bar"}}, - } - ), + [ + KubernetesManifest( + { + "apiVersion": "v1", + "kind": "ServiceAccount", + "metadata": {"labels": {"foo": "bar"}}, + } + ) + ], ) ], ) def test_from_yaml(self, helm_template: str, expected_manifest: KubernetesManifest): - assert KubernetesManifest.from_yaml(helm_template) == expected_manifest + manifests = KubernetesManifest.from_yaml(helm_template) + assert list(manifests) == expected_manifest diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index d5a2d825e..09a4f87b4 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -23,6 +23,7 @@ def test_python_api_generate(self): "tests.pipeline.test_components", pipeline_base_dir=PIPELINE_BASE_DIR_PATH, defaults=RESOURCE_PATH, + output=False, ) assert len(pipeline) == 3 diff --git a/tests/pipeline/test_render.py b/tests/pipeline/test_render.py index 20a832b8f..0736b8723 100644 --- a/tests/pipeline/test_render.py +++ b/tests/pipeline/test_render.py @@ -5,14 +5,16 @@ from pytest_mock import MockerFixture from typer.testing import CliRunner +import kpops from kpops.cli.main import app from kpops.component_handlers.helm_wrapper.helm import Helm from kpops.component_handlers.helm_wrapper.model import HelmConfig, Version +from kpops.component_handlers.kubernetes.model import KubernetesManifest runner = CliRunner() RESOURCE_PATH = Path(__file__).parent / "resources" -PIPELINE_BASE_DIR = str(RESOURCE_PATH.parent) +PIPELINE_BASE_DIR_PATH = RESOURCE_PATH.parent class TestRender: @@ -38,7 +40,7 @@ def test_render_default_config(self, mock_execute: MagicMock): [ "render", "--pipeline-base-dir", - PIPELINE_BASE_DIR, + str(PIPELINE_BASE_DIR_PATH), str(RESOURCE_PATH / "custom-config/pipeline.yaml"), "--defaults", str(RESOURCE_PATH / "no-topics-defaults"), @@ -70,7 +72,7 @@ def test_render_custom_config(self, mock_execute: MagicMock): [ "render", "--pipeline-base-dir", - PIPELINE_BASE_DIR, + str(PIPELINE_BASE_DIR_PATH), str(RESOURCE_PATH / "custom-config/pipeline.yaml"), "--defaults", str(RESOURCE_PATH / "no-topics-defaults"), @@ -99,3 +101,242 @@ def test_render_custom_config(self, mock_execute: MagicMock): ], ) assert result.exit_code == 0 + + def test_python_api(self): + steps = kpops.render( + RESOURCE_PATH / "custom-config/pipeline.yaml", + pipeline_base_dir=PIPELINE_BASE_DIR_PATH, + defaults=RESOURCE_PATH / "no-topics-defaults", + output=False, + ) + assert len(steps) == 2 + manifests = steps[0] + assert len(manifests) == 1 + assert isinstance(manifests[0], KubernetesManifest) + assert manifests[0] == { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "labels": { + "app": "resources-custom-config-app1", + "chart": "producer-app-2.9.0", + "release": "resources-custom-config-app1", + }, + "name": "resources-custom-config-app1", + }, + "spec": { + "backoffLimit": 6, + "template": { + "metadata": { + "labels": { + "app": "resources-custom-config-app1", + "release": "resources-custom-config-app1", + } + }, + "spec": { + "affinity": None, + "containers": [ + { + "env": [ + {"name": "ENV_PREFIX", "value": "APP_"}, + { + "name": "APP_BROKERS", + "value": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", + }, + { + "name": "APP_SCHEMA_REGISTRY_URL", + "value": "http://localhost:8081", + }, + {"name": "APP_DEBUG", "value": "false"}, + { + "name": "APP_OUTPUT_TOPIC", + "value": "resources-custom-config-app1", + }, + { + "name": "JAVA_TOOL_OPTIONS", + "value": "-XX:MaxRAMPercentage=75.0 ", + }, + ], + "image": "producerApp:latest", + "imagePullPolicy": "Always", + "name": "resources-custom-config-app1", + "resources": { + "limits": {"cpu": "500m", "memory": "2G"}, + "requests": {"cpu": "200m", "memory": "2G"}, + }, + } + ], + "restartPolicy": "OnFailure", + }, + }, + }, + } + + manifests = steps[1] + assert len(manifests) == 2 + assert all(isinstance(manifest, KubernetesManifest) for manifest in manifests) + assert manifests[0] == { + "apiVersion": "v1", + "data": { + "jmx-kafka-streams-app-prometheus.yml": "jmxUrl: " + "service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi\n" + "lowercaseOutputName: true\n" + "lowercaseOutputLabelNames: " + "true\n" + "ssl: false\n" + "rules:\n" + ' - pattern: ".*"\n' + }, + "kind": "ConfigMap", + "metadata": { + "labels": { + "app": "resources-custom-config-app2", + "chart": "streams-app-2.9.0", + "heritage": "Helm", + "release": "resources-custom-config-app2", + }, + "name": "resources-custom-config-app2-jmx-configmap", + }, + } + assert manifests[1] == { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": { + "labels": { + "app": "resources-custom-config-app2", + "chart": "streams-app-2.9.0", + "pipeline": "resources-custom-config", + "release": "resources-custom-config-app2", + }, + "name": "resources-custom-config-app2", + }, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": { + "app": "resources-custom-config-app2", + "release": "resources-custom-config-app2", + } + }, + "template": { + "metadata": { + "annotations": { + "prometheus.io/port": "5556", + "prometheus.io/scrape": "true", + }, + "labels": { + "app": "resources-custom-config-app2", + "pipeline": "resources-custom-config", + "release": "resources-custom-config-app2", + }, + }, + "spec": { + "affinity": { + "podAntiAffinity": { + "preferredDuringSchedulingIgnoredDuringExecution": [ + { + "podAffinityTerm": { + "labelSelector": { + "matchExpressions": [ + { + "key": "app", + "operator": "In", + "values": [ + "resources-custom-config-app2" + ], + } + ] + }, + "topologyKey": "kubernetes.io/hostname", + }, + "weight": 1, + } + ] + } + }, + "containers": [ + { + "env": [ + {"name": "ENV_PREFIX", "value": "APP_"}, + {"name": "KAFKA_JMX_PORT", "value": "5555"}, + { + "name": "APP_VOLATILE_GROUP_INSTANCE_ID", + "value": "true", + }, + { + "name": "APP_BROKERS", + "value": "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092", + }, + { + "name": "APP_SCHEMA_REGISTRY_URL", + "value": "http://localhost:8081", + }, + {"name": "APP_DEBUG", "value": "false"}, + { + "name": "APP_INPUT_TOPICS", + "value": "resources-custom-config-app1", + }, + { + "name": "APP_OUTPUT_TOPIC", + "value": "resources-custom-config-app2", + }, + { + "name": "APP_ERROR_TOPIC", + "value": "resources-custom-config-app2-error", + }, + { + "name": "JAVA_TOOL_OPTIONS", + "value": "-Dcom.sun.management.jmxremote.port=5555 " + "-Dcom.sun.management.jmxremote.authenticate=false " + "-Dcom.sun.management.jmxremote.ssl=false " + "-XX:MaxRAMPercentage=75.0 ", + }, + ], + "image": "some-image:latest", + "imagePullPolicy": "Always", + "name": "resources-custom-config-app2", + "ports": [{"containerPort": 5555, "name": "jmx"}], + "resources": { + "limits": {"cpu": "500m", "memory": "2G"}, + "requests": {"cpu": "200m", "memory": "300Mi"}, + }, + }, + { + "command": [ + "java", + "-XX:+UnlockExperimentalVMOptions", + "-XX:+UseCGroupMemoryLimitForHeap", + "-XX:MaxRAMFraction=1", + "-XshowSettings:vm", + "-jar", + "jmx_prometheus_httpserver.jar", + "5556", + "/etc/jmx-streams-app/jmx-kafka-streams-app-prometheus.yml", + ], + "image": "solsson/kafka-prometheus-jmx-exporter@sha256:6f82e2b0464f50da8104acd7363fb9b995001ddff77d248379f8788e78946143", + "name": "prometheus-jmx-exporter", + "ports": [{"containerPort": 5556}], + "resources": { + "limits": {"cpu": "300m", "memory": "2G"}, + "requests": {"cpu": "100m", "memory": "500Mi"}, + }, + "volumeMounts": [ + { + "mountPath": "/etc/jmx-streams-app", + "name": "jmx-config", + } + ], + }, + ], + "volumes": [ + { + "configMap": { + "name": "resources-custom-config-app2-jmx-configmap" + }, + "name": "jmx-config", + } + ], + }, + }, + }, + }