From 62bfb0339236a62cf5518fced8823954cddf7479 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 12:29:05 +0100 Subject: [PATCH 01/27] Refactor workaround for setting Helm app `nameOverride` --- docs/docs/schema/pipeline.json | 33 ++++++++++++------- kpops/components/base_components/helm_app.py | 16 ++++++++- kpops/components/base_components/kafka_app.py | 9 ++--- kpops/pipeline.py | 10 ------ tests/components/test_helm_app.py | 32 ++++++++++-------- 5 files changed, 57 insertions(+), 43 deletions(-) diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 0882ccfa5..dcc11287c 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -66,10 +66,10 @@ "app": { "allOf": [ { - "$ref": "#/$defs/KubernetesAppConfig" + "$ref": "#/$defs/HelmAppConfig" } ], - "description": "Application-specific settings" + "description": "Helm app values" }, "from": { "anyOf": [ @@ -146,6 +146,26 @@ "title": "HelmApp", "type": "object" }, + "HelmAppConfig": { + "additionalProperties": true, + "description": "", + "properties": { + "nameOverride": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Nameoverride" + } + }, + "title": "HelmAppConfig", + "type": "object" + }, "HelmRepoConfig": { "description": "Helm repository configuration.", "properties": { @@ -420,13 +440,6 @@ "title": "KafkaSourceConnector", "type": "object" }, - "KubernetesAppConfig": { - "additionalProperties": true, - "description": "Settings specific to Kubernetes apps.", - "properties": {}, - "title": "KubernetesAppConfig", - "type": "object" - }, "OutputTopicTypes": { "description": "Types of output topic.\n\nOUTPUT (output topic), ERROR (error topic)", "enum": [ @@ -588,7 +601,6 @@ } ], "default": null, - "description": "Override name with this value", "title": "Nameoverride" }, "streams": { @@ -873,7 +885,6 @@ } ], "default": null, - "description": "Override name with this value", "title": "Nameoverride" }, "streams": { diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index 5d70bacfd..1c61b0473 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -16,7 +16,10 @@ HelmTemplateFlags, HelmUpgradeInstallFlags, ) -from kpops.components.base_components.kubernetes_app import KubernetesApp +from kpops.components.base_components.kubernetes_app import ( + KubernetesApp, + KubernetesAppConfig, +) from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import exclude_by_name @@ -24,6 +27,10 @@ log = logging.getLogger("HelmApp") +class HelmAppConfig(KubernetesAppConfig): # TODO: rename HelmAppValues + name_override: str | None = None + + class HelmApp(KubernetesApp): """Kubernetes app managed through Helm with an associated Helm chart. @@ -31,6 +38,7 @@ class HelmApp(KubernetesApp): deploying the component, defaults to None this means that the command "helm repo add" is not called and Helm expects a path to local Helm chart. :param version: Helm chart version, defaults to None + :param app: Helm app values """ repo_config: HelmRepoConfig | None = Field( @@ -41,6 +49,10 @@ class HelmApp(KubernetesApp): default=None, description=describe_attr("version", __doc__), ) + app: HelmAppConfig = Field( + default=..., + description=describe_attr("app", __doc__), + ) @cached_property def helm(self) -> Helm: @@ -142,6 +154,8 @@ def to_helm_values(self) -> dict: :returns: Thte values to be used by Helm """ + if self.app.name_override is None: + self.app.name_override = self.full_name return self.app.model_dump( by_alias=True, exclude_none=True, exclude_defaults=True ) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index b62e54bab..f4e766233 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -11,8 +11,7 @@ HelmUpgradeInstallFlags, ) from kpops.component_handlers.helm_wrapper.utils import trim_release_name -from kpops.components.base_components.helm_app import HelmApp -from kpops.components.base_components.kubernetes_app import KubernetesAppConfig +from kpops.components.base_components.helm_app import HelmApp, HelmAppConfig from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import CamelCaseConfigModel, DescConfigModel @@ -36,19 +35,15 @@ class KafkaStreamsConfig(CamelCaseConfigModel, DescConfigModel): ) -class KafkaAppConfig(KubernetesAppConfig): +class KafkaAppConfig(HelmAppConfig): """Settings specific to Kafka Apps. :param streams: Kafka streams config - :param name_override: Override name with this value, defaults to None """ streams: KafkaStreamsConfig = Field( default=..., description=describe_attr("streams", __doc__) ) - name_override: str | None = Field( - default=None, description=describe_attr("name_override", __doc__) - ) class KafkaApp(HelmApp, ABC): diff --git a/kpops/pipeline.py b/kpops/pipeline.py index ad69521e1..a409f8e35 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -3,7 +3,6 @@ import json import logging from collections import Counter -from contextlib import suppress from dataclasses import dataclass, field from typing import TYPE_CHECKING @@ -53,7 +52,6 @@ def find(self, component_name: str) -> PipelineComponent: raise ValueError(msg) def add(self, component: PipelineComponent) -> None: - self._populate_component_name(component) self.root.append(component) def __bool__(self) -> bool: @@ -78,14 +76,6 @@ def validate_unique_names(self) -> None: msg = f"step names should be unique. duplicate step names: {', '.join(duplicates)}" raise ValidationError(msg) - @staticmethod - def _populate_component_name(component: PipelineComponent) -> None: # TODO: remove - with suppress( - AttributeError # Some components like Kafka Connect do not have a name_override attribute - ): - if (app := getattr(component, "app")) and app.name_override is None: - app.name_override = component.full_name - def create_env_components_index( environment_components: list[dict], diff --git a/tests/components/test_helm_app.py b/tests/components/test_helm_app.py index 0b933b1e9..021617c94 100644 --- a/tests/components/test_helm_app.py +++ b/tests/components/test_helm_app.py @@ -12,18 +12,13 @@ HelmUpgradeInstallFlags, RepoAuthFlags, ) -from kpops.components.base_components.helm_app import HelmApp -from kpops.components.base_components.kubernetes_app import KubernetesAppConfig +from kpops.components.base_components.helm_app import HelmApp, HelmAppConfig from kpops.config import KpopsConfig from kpops.utils.colorify import magentaify DEFAULTS_PATH = Path(__file__).parent / "resources" -class HelmTestValue(KubernetesAppConfig): - name_override: str - - class TestHelmApp: @pytest.fixture() def config(self) -> KpopsConfig: @@ -51,8 +46,8 @@ def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.components.base_components.helm_app.log.info") @pytest.fixture() - def app_value(self) -> HelmTestValue: - return HelmTestValue(name_override="test-value") + def app_value(self) -> HelmAppConfig: + return HelmAppConfig(**{"foo": "test-value"}) @pytest.fixture() def repo_config(self) -> HelmRepoConfig: @@ -63,7 +58,7 @@ def helm_app( self, config: KpopsConfig, handlers: ComponentHandlers, - app_value: HelmTestValue, + app_value: HelmAppConfig, repo_config: HelmRepoConfig, ) -> HelmApp: return HelmApp( @@ -97,7 +92,10 @@ def test_should_lazy_load_helm_wrapper_and_not_repo_add( "test/test-chart", False, "test-namespace", - {"nameOverride": "test-value"}, + { + "nameOverride": "${pipeline_name}-test-helm-app", + "foo": "test-value", + }, HelmUpgradeInstallFlags(), ) @@ -107,7 +105,7 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( handlers: ComponentHandlers, helm_mock: MagicMock, mocker: MockerFixture, - app_value: HelmTestValue, + app_value: HelmAppConfig, ): repo_config = HelmRepoConfig( repository_name="test-repo", url="https://test.com/charts/" @@ -142,7 +140,10 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( "test/test-chart", False, "test-namespace", - {"nameOverride": "test-value"}, + { + "nameOverride": "${pipeline_name}-test-helm-app", + "foo": "test-value", + }, HelmUpgradeInstallFlags(version="3.4.5"), ), ] @@ -152,7 +153,7 @@ def test_should_deploy_app_with_local_helm_chart( config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, - app_value: HelmTestValue, + app_value: HelmAppConfig, ): class AppWithLocalChart(HelmApp): repo_config: None = None @@ -179,7 +180,10 @@ def helm_chart(self) -> str: "path/to/helm/charts/", False, "test-namespace", - {"nameOverride": "test-value"}, + { + "nameOverride": "${pipeline_name}-test-app-with-local-chart", + "foo": "test-value", + }, HelmUpgradeInstallFlags(), ) From 55089bebfee3765d818622e984f88140f8925660 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 13:30:53 +0100 Subject: [PATCH 02/27] Update snapshot assertions --- tests/components/test_kafka_app.py | 1 + tests/components/test_producer_app.py | 3 ++ tests/components/test_streams_app.py | 5 ++- tests/pipeline/snapshots/snap_test_example.py | 5 --- .../pipeline/snapshots/snap_test_pipeline.py | 37 ------------------- 5 files changed, 8 insertions(+), 43 deletions(-) diff --git a/tests/components/test_kafka_app.py b/tests/components/test_kafka_app.py index 06af5d4f5..1ec8d3752 100644 --- a/tests/components/test_kafka_app.py +++ b/tests/components/test_kafka_app.py @@ -97,6 +97,7 @@ def test_should_deploy_kafka_app( True, "test-namespace", { + "nameOverride": "${pipeline_name}-example-name", "streams": {"brokers": "fake-broker:9092", "outputTopic": "test"}, }, HelmUpgradeInstallFlags(version="1.2.3"), diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 89ca25bdd..66ab9861f 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -120,6 +120,7 @@ def test_deploy_order_when_dry_run_is_false( False, "test-namespace", { + "nameOverride": "${pipeline_name}-" + self.PRODUCER_APP_NAME, "streams": { "brokers": "fake-broker:9092", "outputTopic": "${output_topic_name}", @@ -184,6 +185,7 @@ def test_should_not_reset_producer_app( True, "test-namespace", { + "nameOverride": "${pipeline_name}-" + self.PRODUCER_APP_NAME, "streams": { "brokers": "fake-broker:9092", "outputTopic": "${output_topic_name}", @@ -229,6 +231,7 @@ def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_w False, "test-namespace", { + "nameOverride": "${pipeline_name}-" + self.PRODUCER_APP_NAME, "streams": { "brokers": "fake-broker:9092", "outputTopic": "${output_topic_name}", diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 93f6022f2..d4e6fb6d9 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -323,6 +323,7 @@ def test_deploy_order_when_dry_run_is_false( dry_run, "test-namespace", { + "nameOverride": "${pipeline_name}-" + self.STREAMS_APP_NAME, "streams": { "brokers": "fake-broker:9092", "extraOutputTopics": { @@ -331,7 +332,7 @@ def test_deploy_order_when_dry_run_is_false( }, "outputTopic": "${output_topic_name}", "errorTopic": "${error_topic_name}", - } + }, }, HelmUpgradeInstallFlags( create_namespace=False, @@ -384,6 +385,7 @@ def test_reset_when_dry_run_is_false( dry_run, "test-namespace", { + "nameOverride": "${pipeline_name}-" + self.STREAMS_APP_NAME, "streams": { "brokers": "fake-broker:9092", "outputTopic": "${output_topic_name}", @@ -428,6 +430,7 @@ def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean_up( dry_run, "test-namespace", { + "nameOverride": "${pipeline_name}-" + self.STREAMS_APP_NAME, "streams": { "brokers": "fake-broker:9092", "outputTopic": "${output_topic_name}", diff --git a/tests/pipeline/snapshots/snap_test_example.py b/tests/pipeline/snapshots/snap_test_example.py index 14d3d650c..a88a7ee4a 100644 --- a/tests/pipeline/snapshots/snap_test_example.py +++ b/tests/pipeline/snapshots/snap_test_example.py @@ -13,7 +13,6 @@ 'debug': True, 'image': '${DOCKER_REGISTRY}/atm-demo-accountproducer', 'imageTag': '1.0.0', - 'nameOverride': 'account-producer', 'prometheus': { 'jmx': { 'enabled': False @@ -64,7 +63,6 @@ 'debug': True, 'image': '${DOCKER_REGISTRY}/atm-demo-transactionavroproducer', 'imageTag': '1.0.0', - 'nameOverride': 'transaction-avro-producer', 'prometheus': { 'jmx': { 'enabled': False @@ -120,7 +118,6 @@ 'labels': { 'pipeline': 'bakdata-atm-fraud-detection' }, - 'nameOverride': 'transaction-joiner', 'prometheus': { 'jmx': { 'enabled': False @@ -182,7 +179,6 @@ 'labels': { 'pipeline': 'bakdata-atm-fraud-detection' }, - 'nameOverride': 'fraud-detector', 'prometheus': { 'jmx': { 'enabled': False @@ -244,7 +240,6 @@ 'labels': { 'pipeline': 'bakdata-atm-fraud-detection' }, - 'nameOverride': 'account-linker', 'prometheus': { 'jmx': { 'enabled': False diff --git a/tests/pipeline/snapshots/snap_test_pipeline.py b/tests/pipeline/snapshots/snap_test_pipeline.py index c9fee4d4b..0da4f9260 100644 --- a/tests/pipeline/snapshots/snap_test_pipeline.py +++ b/tests/pipeline/snapshots/snap_test_pipeline.py @@ -10,7 +10,6 @@ snapshots['TestPipeline.test_default_config test-pipeline'] = [ { 'app': { - 'nameOverride': 'resources-custom-config-app1', 'resources': { 'limits': { 'memory': '2G' @@ -58,7 +57,6 @@ 'labels': { 'pipeline': 'resources-custom-config' }, - 'nameOverride': 'resources-custom-config-app2', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'errorTopic': 'resources-custom-config-app2-error', @@ -110,7 +108,6 @@ }, 'image': 'example-registry/fake-image', 'imageTag': '0.0.1', - 'nameOverride': 'resources-pipeline-with-inflate-scheduled-producer', 'schedule': '30 3/8 * * *', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', @@ -165,7 +162,6 @@ 'commandLine': { 'CONVERT_XML': True }, - 'nameOverride': 'resources-pipeline-with-inflate-converter', 'resources': { 'limits': { 'memory': '2G' @@ -242,7 +238,6 @@ }, 'image': 'fake-registry/filter', 'imageTag': '2.4.1', - 'nameOverride': 'resources-pipeline-with-inflate-should-inflate', 'replicaCount': 4, 'resources': { 'requests': { @@ -345,7 +340,6 @@ }, { 'app': { - 'nameOverride': 'resources-pipeline-with-inflate-should-inflate-inflated-streams-app', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -397,7 +391,6 @@ { 'app': { 'image': 'fake-image', - 'nameOverride': 'resources-kafka-connect-sink-streams-app', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -492,7 +485,6 @@ }, 'image': 'example-registry/fake-image', 'imageTag': '0.0.1', - 'nameOverride': 'resources-first-pipeline-scheduled-producer', 'schedule': '30 3/8 * * *', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', @@ -547,7 +539,6 @@ 'commandLine': { 'CONVERT_XML': True }, - 'nameOverride': 'resources-first-pipeline-converter', 'resources': { 'limits': { 'memory': '2G' @@ -624,7 +615,6 @@ }, 'image': 'fake-registry/filter', 'imageTag': '2.4.1', - 'nameOverride': 'resources-first-pipeline-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name', 'replicaCount': 4, 'resources': { 'requests': { @@ -683,7 +673,6 @@ snapshots['TestPipeline.test_model_serialization test-pipeline'] = [ { 'app': { - 'nameOverride': 'resources-pipeline-with-paths-account-producer', 'streams': { 'brokers': 'test', 'extraOutputTopics': { @@ -716,7 +705,6 @@ 'commandLine': { 'CONVERT_XML': True }, - 'nameOverride': 'resources-no-input-topic-pipeline-app1', 'resources': { 'limits': { 'memory': '2G' @@ -779,7 +767,6 @@ }, { 'app': { - 'nameOverride': 'resources-no-input-topic-pipeline-app2', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -839,7 +826,6 @@ { 'app': { 'image': 'fake-image', - 'nameOverride': 'resources-no-user-defined-components-streams-app', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -904,7 +890,6 @@ }, 'image': 'example-registry/fake-image', 'imageTag': '0.0.1', - 'nameOverride': 'resources-pipeline-with-envs-input-producer', 'schedule': '20 3/8 * * *', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', @@ -959,7 +944,6 @@ 'commandLine': { 'CONVERT_XML': True }, - 'nameOverride': 'resources-pipeline-with-envs-converter', 'resources': { 'limits': { 'memory': '2G' @@ -1036,7 +1020,6 @@ }, 'image': 'fake-registry/filter', 'imageTag': '2.4.1', - 'nameOverride': 'resources-pipeline-with-envs-filter', 'replicaCount': 4, 'resources': { 'requests': { @@ -1098,7 +1081,6 @@ 'debug': True, 'image': '${DOCKER_REGISTRY}/atm-demo-accountproducer', 'imageTag': '1.0.0', - 'nameOverride': 'from-pipeline-component-account-producer', 'prometheus': { 'jmx': { 'enabled': False @@ -1132,7 +1114,6 @@ snapshots['TestPipeline.test_read_from_component test-pipeline'] = [ { 'app': { - 'nameOverride': 'resources-read-from-component-producer1', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'extraOutputTopics': { @@ -1167,7 +1148,6 @@ }, { 'app': { - 'nameOverride': 'producer2', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'extraOutputTopics': { @@ -1217,7 +1197,6 @@ }, 'image': 'fake-registry/filter', 'imageTag': '2.4.1', - 'nameOverride': 'resources-read-from-component-inflate-step', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -1314,7 +1293,6 @@ }, { 'app': { - 'nameOverride': 'resources-read-from-component-inflate-step-inflated-streams-app', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -1377,7 +1355,6 @@ }, 'image': 'fake-registry/filter', 'imageTag': '2.4.1', - 'nameOverride': 'inflate-step-without-prefix', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -1474,7 +1451,6 @@ }, { 'app': { - 'nameOverride': 'resources-read-from-component-inflate-step-without-prefix-inflated-streams-app', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -1522,7 +1498,6 @@ }, { 'app': { - 'nameOverride': 'resources-read-from-component-consumer1', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -1579,7 +1554,6 @@ }, { 'app': { - 'nameOverride': 'resources-read-from-component-consumer2', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -1634,7 +1608,6 @@ }, { 'app': { - 'nameOverride': 'resources-read-from-component-consumer3', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -1689,7 +1662,6 @@ }, { 'app': { - 'nameOverride': 'resources-read-from-component-consumer4', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -1740,7 +1712,6 @@ }, { 'app': { - 'nameOverride': 'resources-read-from-component-consumer5', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { @@ -1804,7 +1775,6 @@ 'app_schedule': '30 3/8 * * *', 'app_type': 'scheduled-producer' }, - 'nameOverride': 'resources-component-type-substitution-scheduled-producer', 'schedule': '30 3/8 * * *', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', @@ -1859,7 +1829,6 @@ 'commandLine': { 'CONVERT_XML': True }, - 'nameOverride': 'resources-component-type-substitution-converter', 'resources': { 'limits': { 'memory': '2G' @@ -1943,7 +1912,6 @@ 'filter': 'filter-app-filter', 'test_placeholder_in_placeholder': 'filter-app-filter' }, - 'nameOverride': 'resources-component-type-substitution-filter-app', 'replicaCount': 4, 'resources': { 'requests': { @@ -2002,7 +1970,6 @@ snapshots['TestPipeline.test_with_custom_config_with_absolute_defaults_path test-pipeline'] = [ { 'app': { - 'nameOverride': 'resources-custom-config-app1', 'resources': { 'limits': { 'memory': '2G' @@ -2050,7 +2017,6 @@ 'labels': { 'pipeline': 'resources-custom-config' }, - 'nameOverride': 'resources-custom-config-app2', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'errorTopic': 'app2-dead-letter-topic', @@ -2097,7 +2063,6 @@ snapshots['TestPipeline.test_with_custom_config_with_relative_defaults_path test-pipeline'] = [ { 'app': { - 'nameOverride': 'resources-custom-config-app1', 'resources': { 'limits': { 'memory': '2G' @@ -2145,7 +2110,6 @@ 'labels': { 'pipeline': 'resources-custom-config' }, - 'nameOverride': 'resources-custom-config-app2', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'errorTopic': 'app2-dead-letter-topic', @@ -2193,7 +2157,6 @@ { 'app': { 'image': 'fake-image', - 'nameOverride': 'resources-kafka-connect-sink-streams-app-development', 'streams': { 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', 'config': { From 8d9bb60124da9400cd9e73a7f7aebd16fc32af86 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 13:36:08 +0100 Subject: [PATCH 03/27] Use consistent naming for app values --- docs/docs/schema/pipeline.json | 76 +++++++++---------- kpops/components/base_components/helm_app.py | 6 +- kpops/components/base_components/kafka_app.py | 6 +- .../base_components/kubernetes_app.py | 4 +- .../streams_bootstrap/producer/model.py | 4 +- .../producer/producer_app.py | 4 +- .../streams_bootstrap/streams/model.py | 4 +- .../streams_bootstrap/streams/streams_app.py | 4 +- tests/components/test_helm_app.py | 12 +-- tests/components/test_kubernetes_app.py | 4 +- 10 files changed, 62 insertions(+), 62 deletions(-) diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index dcc11287c..ea5ce7f7e 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -66,7 +66,7 @@ "app": { "allOf": [ { - "$ref": "#/$defs/HelmAppConfig" + "$ref": "#/$defs/HelmAppValues" } ], "description": "Helm app values" @@ -146,7 +146,7 @@ "title": "HelmApp", "type": "object" }, - "HelmAppConfig": { + "HelmAppValues": { "additionalProperties": true, "description": "", "properties": { @@ -163,7 +163,7 @@ "title": "Nameoverride" } }, - "title": "HelmAppConfig", + "title": "HelmAppValues", "type": "object" }, "HelmRepoConfig": { @@ -456,7 +456,7 @@ "app": { "allOf": [ { - "$ref": "#/$defs/ProducerValues" + "$ref": "#/$defs/ProducerAppValues" } ], "description": "Application-specific settings" @@ -536,6 +536,37 @@ "title": "ProducerApp", "type": "object" }, + "ProducerAppValues": { + "additionalProperties": true, + "description": "Settings specific to producers.", + "properties": { + "nameOverride": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Nameoverride" + }, + "streams": { + "allOf": [ + { + "$ref": "#/$defs/ProducerStreamsConfig" + } + ], + "description": "Kafka Streams settings" + } + }, + "required": [ + "streams" + ], + "title": "ProducerAppValues", + "type": "object" + }, "ProducerStreamsConfig": { "additionalProperties": true, "description": "Kafka Streams settings specific to Producer.", @@ -587,37 +618,6 @@ "title": "ProducerStreamsConfig", "type": "object" }, - "ProducerValues": { - "additionalProperties": true, - "description": "Settings specific to producers.", - "properties": { - "nameOverride": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "title": "Nameoverride" - }, - "streams": { - "allOf": [ - { - "$ref": "#/$defs/ProducerStreamsConfig" - } - ], - "description": "Kafka Streams settings" - } - }, - "required": [ - "streams" - ], - "title": "ProducerValues", - "type": "object" - }, "RepoAuthFlags": { "description": "Authorisation-related flags for `helm repo`.", "properties": { @@ -692,7 +692,7 @@ "app": { "allOf": [ { - "$ref": "#/$defs/StreamsAppConfig" + "$ref": "#/$defs/StreamsAppValues" } ], "description": "Application-specific settings" @@ -859,7 +859,7 @@ "title": "StreamsAppAutoScaling", "type": "object" }, - "StreamsAppConfig": { + "StreamsAppValues": { "additionalProperties": true, "description": "StreamsBoostrap app configurations.\nThe attributes correspond to keys and values that are used as values for the streams bootstrap helm chart.", "properties": { @@ -899,7 +899,7 @@ "required": [ "streams" ], - "title": "StreamsAppConfig", + "title": "StreamsAppValues", "type": "object" }, "StreamsConfig": { diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index 1c61b0473..4583b7acb 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -18,7 +18,7 @@ ) from kpops.components.base_components.kubernetes_app import ( KubernetesApp, - KubernetesAppConfig, + KubernetesAppValues, ) from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr @@ -27,7 +27,7 @@ log = logging.getLogger("HelmApp") -class HelmAppConfig(KubernetesAppConfig): # TODO: rename HelmAppValues +class HelmAppValues(KubernetesAppValues): name_override: str | None = None @@ -49,7 +49,7 @@ class HelmApp(KubernetesApp): default=None, description=describe_attr("version", __doc__), ) - app: HelmAppConfig = Field( + app: HelmAppValues = Field( default=..., description=describe_attr("app", __doc__), ) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index f4e766233..df6f54568 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -11,7 +11,7 @@ HelmUpgradeInstallFlags, ) from kpops.component_handlers.helm_wrapper.utils import trim_release_name -from kpops.components.base_components.helm_app import HelmApp, HelmAppConfig +from kpops.components.base_components.helm_app import HelmApp, HelmAppValues from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import CamelCaseConfigModel, DescConfigModel @@ -35,7 +35,7 @@ class KafkaStreamsConfig(CamelCaseConfigModel, DescConfigModel): ) -class KafkaAppConfig(HelmAppConfig): +class KafkaAppValues(HelmAppValues): """Settings specific to Kafka Apps. :param streams: Kafka streams config @@ -58,7 +58,7 @@ class KafkaApp(HelmApp, ABC): :param version: Helm chart version, defaults to "2.9.0" """ - app: KafkaAppConfig = Field( + app: KafkaAppValues = Field( default=..., description=describe_attr("app", __doc__), ) diff --git a/kpops/components/base_components/kubernetes_app.py b/kpops/components/base_components/kubernetes_app.py index cae474cee..2b4065191 100644 --- a/kpops/components/base_components/kubernetes_app.py +++ b/kpops/components/base_components/kubernetes_app.py @@ -18,7 +18,7 @@ ) -class KubernetesAppConfig(CamelCaseConfigModel, DescConfigModel): +class KubernetesAppValues(CamelCaseConfigModel, DescConfigModel): """Settings specific to Kubernetes apps.""" model_config = ConfigDict( @@ -39,7 +39,7 @@ class KubernetesApp(PipelineComponent, ABC): default=..., description=describe_attr("namespace", __doc__), ) - app: KubernetesAppConfig = Field( + app: KubernetesAppValues = Field( default=..., description=describe_attr("app", __doc__), ) diff --git a/kpops/components/streams_bootstrap/producer/model.py b/kpops/components/streams_bootstrap/producer/model.py index 01bda1dbc..53db5af67 100644 --- a/kpops/components/streams_bootstrap/producer/model.py +++ b/kpops/components/streams_bootstrap/producer/model.py @@ -1,7 +1,7 @@ from pydantic import ConfigDict, Field from kpops.components.base_components.kafka_app import ( - KafkaAppConfig, + KafkaAppValues, KafkaStreamsConfig, ) from kpops.utils.docstring import describe_attr @@ -22,7 +22,7 @@ class ProducerStreamsConfig(KafkaStreamsConfig): ) -class ProducerValues(KafkaAppConfig): +class ProducerAppValues(KafkaAppValues): """Settings specific to producers. :param streams: Kafka Streams settings diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 6091cdd77..e37529bae 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -9,7 +9,7 @@ TopicConfig, ) from kpops.components.streams_bootstrap.app_type import AppType -from kpops.components.streams_bootstrap.producer.model import ProducerValues +from kpops.components.streams_bootstrap.producer.model import ProducerAppValues from kpops.utils.docstring import describe_attr @@ -25,7 +25,7 @@ class ProducerApp(KafkaApp): :param from_: Producer doesn't support FromSection, defaults to None """ - app: ProducerValues = Field( + app: ProducerAppValues = Field( default=..., description=describe_attr("app", __doc__), ) diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 2c8b952ce..b52bc162c 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -5,7 +5,7 @@ from kpops.components.base_components.base_defaults_component import deduplicate from kpops.components.base_components.kafka_app import ( - KafkaAppConfig, + KafkaAppValues, KafkaStreamsConfig, ) from kpops.utils.docstring import describe_attr @@ -166,7 +166,7 @@ class StreamsAppAutoScaling(CamelCaseConfigModel, DescConfigModel): model_config = ConfigDict(extra="allow") -class StreamsAppConfig(KafkaAppConfig): +class StreamsAppValues(KafkaAppValues): """StreamsBoostrap app configurations. The attributes correspond to keys and values that are used as values for the streams bootstrap helm chart. diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index a466b4eba..e8a434b70 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -3,7 +3,7 @@ from kpops.components.base_components.kafka_app import KafkaApp from kpops.components.streams_bootstrap.app_type import AppType -from kpops.components.streams_bootstrap.streams.model import StreamsAppConfig +from kpops.components.streams_bootstrap.streams.model import StreamsAppValues from kpops.utils.docstring import describe_attr @@ -13,7 +13,7 @@ class StreamsApp(KafkaApp): :param app: Application-specific settings """ - app: StreamsAppConfig = Field( + app: StreamsAppValues = Field( default=..., description=describe_attr("app", __doc__), ) diff --git a/tests/components/test_helm_app.py b/tests/components/test_helm_app.py index 021617c94..8afef52d9 100644 --- a/tests/components/test_helm_app.py +++ b/tests/components/test_helm_app.py @@ -12,7 +12,7 @@ HelmUpgradeInstallFlags, RepoAuthFlags, ) -from kpops.components.base_components.helm_app import HelmApp, HelmAppConfig +from kpops.components.base_components.helm_app import HelmApp, HelmAppValues from kpops.config import KpopsConfig from kpops.utils.colorify import magentaify @@ -46,8 +46,8 @@ def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.components.base_components.helm_app.log.info") @pytest.fixture() - def app_value(self) -> HelmAppConfig: - return HelmAppConfig(**{"foo": "test-value"}) + def app_value(self) -> HelmAppValues: + return HelmAppValues(**{"foo": "test-value"}) @pytest.fixture() def repo_config(self) -> HelmRepoConfig: @@ -58,7 +58,7 @@ def helm_app( self, config: KpopsConfig, handlers: ComponentHandlers, - app_value: HelmAppConfig, + app_value: HelmAppValues, repo_config: HelmRepoConfig, ) -> HelmApp: return HelmApp( @@ -105,7 +105,7 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( handlers: ComponentHandlers, helm_mock: MagicMock, mocker: MockerFixture, - app_value: HelmAppConfig, + app_value: HelmAppValues, ): repo_config = HelmRepoConfig( repository_name="test-repo", url="https://test.com/charts/" @@ -153,7 +153,7 @@ def test_should_deploy_app_with_local_helm_chart( config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, - app_value: HelmAppConfig, + app_value: HelmAppValues, ): class AppWithLocalChart(HelmApp): repo_config: None = None diff --git a/tests/components/test_kubernetes_app.py b/tests/components/test_kubernetes_app.py index 95ab11f6c..8c887221d 100644 --- a/tests/components/test_kubernetes_app.py +++ b/tests/components/test_kubernetes_app.py @@ -7,14 +7,14 @@ from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.kubernetes_app import ( KubernetesApp, - KubernetesAppConfig, + KubernetesAppValues, ) from kpops.config import KpopsConfig DEFAULTS_PATH = Path(__file__).parent / "resources" -class KubernetesTestValue(KubernetesAppConfig): +class KubernetesTestValue(KubernetesAppValues): foo: str From 934e4ededb7b5dace226304158412809bf82da97 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 13:51:46 +0100 Subject: [PATCH 04/27] Rename fixture --- tests/components/test_helm_app.py | 14 +++++++------- tests/components/test_kubernetes_app.py | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/components/test_helm_app.py b/tests/components/test_helm_app.py index 8afef52d9..e43c9de41 100644 --- a/tests/components/test_helm_app.py +++ b/tests/components/test_helm_app.py @@ -46,7 +46,7 @@ def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.components.base_components.helm_app.log.info") @pytest.fixture() - def app_value(self) -> HelmAppValues: + def app_values(self) -> HelmAppValues: return HelmAppValues(**{"foo": "test-value"}) @pytest.fixture() @@ -58,14 +58,14 @@ def helm_app( self, config: KpopsConfig, handlers: ComponentHandlers, - app_value: HelmAppValues, + app_values: HelmAppValues, repo_config: HelmRepoConfig, ) -> HelmApp: return HelmApp( name="test-helm-app", config=config, handlers=handlers, - app=app_value, + app=app_values, namespace="test-namespace", repo_config=repo_config, ) @@ -105,7 +105,7 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( handlers: ComponentHandlers, helm_mock: MagicMock, mocker: MockerFixture, - app_value: HelmAppValues, + app_values: HelmAppValues, ): repo_config = HelmRepoConfig( repository_name="test-repo", url="https://test.com/charts/" @@ -114,7 +114,7 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( name="test-helm-app", config=config, handlers=handlers, - app=app_value, + app=app_values, namespace="test-namespace", repo_config=repo_config, version="3.4.5", @@ -153,7 +153,7 @@ def test_should_deploy_app_with_local_helm_chart( config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, - app_value: HelmAppValues, + app_values: HelmAppValues, ): class AppWithLocalChart(HelmApp): repo_config: None = None @@ -167,7 +167,7 @@ def helm_chart(self) -> str: name="test-app-with-local-chart", config=config, handlers=handlers, - app=app_value, + app=app_values, namespace="test-namespace", ) diff --git a/tests/components/test_kubernetes_app.py b/tests/components/test_kubernetes_app.py index 8c887221d..aeec95479 100644 --- a/tests/components/test_kubernetes_app.py +++ b/tests/components/test_kubernetes_app.py @@ -36,7 +36,7 @@ def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.components.base_components.kubernetes_app.log.info") @pytest.fixture() - def app_value(self) -> KubernetesTestValue: + def app_values(self) -> KubernetesTestValue: return KubernetesTestValue(foo="foo") @pytest.fixture() @@ -44,13 +44,13 @@ def kubernetes_app( self, config: KpopsConfig, handlers: ComponentHandlers, - app_value: KubernetesTestValue, + app_values: KubernetesTestValue, ) -> KubernetesApp: return KubernetesApp( name="test-kubernetes-app", config=config, handlers=handlers, - app=app_value, + app=app_values, namespace="test-namespace", ) @@ -58,7 +58,7 @@ def test_should_raise_value_error_when_name_is_not_valid( self, config: KpopsConfig, handlers: ComponentHandlers, - app_value: KubernetesTestValue, + app_values: KubernetesTestValue, ): with pytest.raises( ValueError, match=r"The component name .* is invalid for Kubernetes." @@ -67,7 +67,7 @@ def test_should_raise_value_error_when_name_is_not_valid( name="Not-Compatible*", config=config, handlers=handlers, - app=app_value, + app=app_values, namespace="test-namespace", ) @@ -78,7 +78,7 @@ def test_should_raise_value_error_when_name_is_not_valid( name="snake_case*", config=config, handlers=handlers, - app=app_value, + app=app_values, namespace="test-namespace", ) @@ -86,6 +86,6 @@ def test_should_raise_value_error_when_name_is_not_valid( name="valid-name", config=config, handlers=handlers, - app=app_value, + app=app_values, namespace="test-namespace", ) From 1c4e3134b99de5ffb9964a4c9b16d7b70b620e83 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 13:56:36 +0100 Subject: [PATCH 05/27] Refactor --- kpops/component_handlers/kafka_connect/model.py | 11 ++--------- kpops/components/base_components/helm_app.py | 13 ++++++++++--- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/kpops/component_handlers/kafka_connect/model.py b/kpops/component_handlers/kafka_connect/model.py index a7ec45af9..2182ff1b6 100644 --- a/kpops/component_handlers/kafka_connect/model.py +++ b/kpops/component_handlers/kafka_connect/model.py @@ -11,6 +11,7 @@ from pydantic.json_schema import SkipJsonSchema from typing_extensions import override +from kpops.components.base_components.helm_app import HelmAppValues from kpops.utils.pydantic import ( CamelCaseConfigModel, DescConfigModel, @@ -99,14 +100,6 @@ class KafkaConnectResetterConfig(CamelCaseConfigModel): offset_topic: str | None = None -class KafkaConnectResetterValues(CamelCaseConfigModel): +class KafkaConnectResetterValues(HelmAppValues): connector_type: Literal["source", "sink"] config: KafkaConnectResetterConfig - name_override: str - - # TODO(Ivan Yordanov): Replace with a function decorated with `@model_serializer` - # BEWARE! All default values are enforced, hard to replicate without - # access to ``model_dump`` - @override - def model_dump(self, **_) -> dict[str, Any]: - return super().model_dump(by_alias=True, exclude_none=True) diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index 4583b7acb..d0aac74d8 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -30,6 +30,15 @@ class HelmAppValues(KubernetesAppValues): name_override: str | None = None + # TODO(Ivan Yordanov): Replace with a function decorated with `@model_serializer` + # BEWARE! All default values are enforced, hard to replicate without + # access to ``model_dump`` + @override + def model_dump(self, **_) -> dict[str, Any]: + return super().model_dump( + by_alias=True, exclude_none=True, exclude_defaults=True + ) + class HelmApp(KubernetesApp): """Kubernetes app managed through Helm with an associated Helm chart. @@ -156,9 +165,7 @@ def to_helm_values(self) -> dict: """ if self.app.name_override is None: self.app.name_override = self.full_name - return self.app.model_dump( - by_alias=True, exclude_none=True, exclude_defaults=True - ) + return self.app.model_dump() def print_helm_diff(self, stdout: str) -> None: """Print the diff of the last and current release of this component. From 2309a7b1e66c0f71c3d3884b5790c6c90de33e51 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 14:04:12 +0100 Subject: [PATCH 06/27] Rename --- kpops/component_handlers/kafka_connect/model.py | 6 +++--- kpops/components/base_components/kafka_connector.py | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/kpops/component_handlers/kafka_connect/model.py b/kpops/component_handlers/kafka_connect/model.py index 2182ff1b6..59cdba7b9 100644 --- a/kpops/component_handlers/kafka_connect/model.py +++ b/kpops/component_handlers/kafka_connect/model.py @@ -93,13 +93,13 @@ class KafkaConnectConfigErrorResponse(BaseModel): configs: list[KafkaConnectConfigDescription] -class KafkaConnectResetterConfig(CamelCaseConfigModel): +class KafkaConnectorResetterConfig(CamelCaseConfigModel): brokers: str connector: str delete_consumer_group: bool | None = None offset_topic: str | None = None -class KafkaConnectResetterValues(HelmAppValues): +class KafkaConnectorResetterValues(HelmAppValues): connector_type: Literal["source", "sink"] - config: KafkaConnectResetterConfig + config: KafkaConnectorResetterConfig diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 7af2c5ae4..6f19754d2 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -20,9 +20,9 @@ from kpops.component_handlers.helm_wrapper.utils import trim_release_name from kpops.component_handlers.kafka_connect.model import ( KafkaConnectorConfig, + KafkaConnectorResetterConfig, + KafkaConnectorResetterValues, KafkaConnectorType, - KafkaConnectResetterConfig, - KafkaConnectResetterValues, ) from kpops.components.base_components.base_defaults_component import deduplicate from kpops.components.base_components.models.from_section import FromTopic @@ -176,7 +176,7 @@ def _run_connect_resetter( :param dry_run: If the cleanup should be run in dry run mode or not :param retain_clean_jobs: If the cleanup job should be kept - :param kwargs: Other values for the KafkaConnectResetter + :param kwargs: Other values for the KafkaConnectorResetter """ log.info( magentaify( @@ -237,8 +237,8 @@ def _get_kafka_connect_resetter_values( :return: The Helm chart values of the connector resetter """ return { - **KafkaConnectResetterValues( - config=KafkaConnectResetterConfig( + **KafkaConnectorResetterValues( + config=KafkaConnectorResetterConfig( connector=self.full_name, brokers=self.config.kafka_brokers, **kwargs, From f67a23f7c001794913aea3e419432c46e72c7ceb Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 16:27:40 +0100 Subject: [PATCH 07/27] Refactor streams-bootstrap cleanup --- kpops/components/base_components/helm_app.py | 6 - kpops/components/base_components/kafka_app.py | 124 ++++++++---------- .../producer/producer_app.py | 39 +++--- .../streams_bootstrap/streams/streams_app.py | 46 +++---- tests/components/test_streams_app.py | 2 +- 5 files changed, 107 insertions(+), 110 deletions(-) diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index 36e3bd21b..d52976c7a 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -100,12 +100,6 @@ def helm_release_name(self) -> str: """The name for the Helm release. Can be overridden.""" return create_helm_release_name(self.full_name) - @property - def clean_release_name(self) -> str: - """The name for the Helm release for cleanup jobs. Can be overridden.""" - suffix = "-clean" - return create_helm_release_name(self.full_name + suffix, suffix) - @property def helm_chart(self) -> str: """Return component's Helm chart.""" diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index c7c983e0d..e4e376fe2 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -7,9 +7,11 @@ from typing_extensions import override from kpops.component_handlers.helm_wrapper.model import ( + HelmFlags, HelmRepoConfig, HelmUpgradeInstallFlags, ) +from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name from kpops.components.base_components.helm_app import HelmApp, HelmAppValues from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import CamelCaseConfigModel, DescConfigModel @@ -45,6 +47,54 @@ class KafkaAppValues(HelmAppValues): ) +class StreamsBootstrapHelmApp(HelmApp): + repo_config: HelmRepoConfig = Field( + default=HelmRepoConfig( + repository_name="bakdata-streams-bootstrap", + url="https://bakdata.github.io/streams-bootstrap/", + ), + description=describe_attr("repo_config", __doc__), + ) + + +class KafkaAppCleaner(StreamsBootstrapHelmApp): + @property + @override + def helm_chart(self) -> str: + raise NotImplementedError + + @property + @override + def helm_release_name(self) -> str: + suffix = "-clean" + return create_helm_release_name(self.full_name + suffix, suffix) + + @property + @override + def helm_flags(self) -> HelmFlags: + return HelmUpgradeInstallFlags( + create_namespace=self.config.create_namespace, + version=self.version, + wait=True, + wait_for_jobs=True, + ) + + def run(self, dry_run: bool) -> None: + """Clean an app using the respective cleanup job. + + :param dry_run: Dry run command + """ + log.info(f"Uninstall old cleanup job for {self.helm_release_name}") + self.destroy(dry_run=dry_run) + + log.info(f"Init cleanup job for {self.helm_release_name}") + self.deploy(dry_run=dry_run) + + if not self.config.retain_clean_jobs: + log.info(f"Uninstall cleanup job for {self.helm_release_name}") + self.destroy(dry_run=dry_run) + + class KafkaApp(HelmApp, ABC): """Base component for Kafka-based components. @@ -74,9 +124,14 @@ class KafkaApp(HelmApp, ABC): ) @property - def clean_up_helm_chart(self) -> str: - """Helm chart used to destroy and clean this component.""" - raise NotImplementedError + def _cleaner(self) -> KafkaAppCleaner: + return KafkaAppCleaner( + config=self.config, + handlers=self.handlers, + name=self.name, + namespace=self.namespace, + app=self.app, + ) @override def deploy(self, dry_run: bool) -> None: @@ -90,66 +145,3 @@ def deploy(self, dry_run: bool) -> None: to_section=self.to, dry_run=dry_run ) super().deploy(dry_run) - - def _run_clean_up_job( - self, - values: dict, - dry_run: bool, - retain_clean_jobs: bool = False, - ) -> None: - """Clean an app using the respective cleanup job. - - :param values: The value YAML for the chart - :param dry_run: Dry run command - :param retain_clean_jobs: Whether to retain the cleanup job, defaults to False - """ - log.info(f"Uninstall old cleanup job for {self.clean_release_name}") - - self.__uninstall_clean_up_job(self.clean_release_name, dry_run) - - log.info(f"Init cleanup job for {self.clean_release_name}") - - stdout = self.__install_clean_up_job(self.clean_release_name, values, dry_run) - - if dry_run: - self.dry_run_handler.print_helm_diff(stdout, self.clean_release_name, log) - - if not retain_clean_jobs: - log.info(f"Uninstall cleanup job for {self.clean_release_name}") - self.__uninstall_clean_up_job(self.clean_release_name, dry_run) - - def __uninstall_clean_up_job(self, release_name: str, dry_run: bool) -> None: - """Uninstall clean up job. - - :param release_name: Name of the Helm release - :param dry_run: Whether to do a dry run of the command - """ - self.helm.uninstall(self.namespace, release_name, dry_run) - - def __install_clean_up_job( - self, - release_name: str, - values: dict, - dry_run: bool, - ) -> str: - """Install clean up job. - - :param release_name: Name of the Helm release - :param suffix: Suffix to add to the release name, e.g. "-clean" - :param values: The Helm values for the chart - :param dry_run: Whether to do a dry run of the command - :return: Return the output of the installation - """ - return self.helm.upgrade_install( - release_name, - self.clean_up_helm_chart, - dry_run, - self.namespace, - values, - HelmUpgradeInstallFlags( - create_namespace=self.config.create_namespace, - version=self.version, - wait=True, - wait_for_jobs=True, - ), - ) diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index e37529bae..b92479e4c 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -1,9 +1,7 @@ -# from __future__ import annotations - from pydantic import Field from typing_extensions import override -from kpops.components.base_components.kafka_app import KafkaApp +from kpops.components.base_components.kafka_app import KafkaApp, KafkaAppCleaner from kpops.components.base_components.models.to_section import ( OutputTopicTypes, TopicConfig, @@ -13,6 +11,17 @@ from kpops.utils.docstring import describe_attr +class ProducerAppCleaner(KafkaAppCleaner): + app: ProducerAppValues + + @property + @override + def helm_chart(self) -> str: + return ( + f"{self.repo_config.repository_name}/{AppType.CLEANUP_PRODUCER_APP.value}" + ) + + class ProducerApp(KafkaApp): """Producer component. @@ -36,6 +45,16 @@ class ProducerApp(KafkaApp): description=describe_attr("from_", __doc__), ) + @property + def _cleaner(self) -> ProducerAppCleaner: + return ProducerAppCleaner( + config=self.config, + handlers=self.handlers, + name=self.name, + namespace=self.namespace, + app=self.app, + ) + @override def apply_to_outputs(self, name: str, topic: TopicConfig) -> None: match topic.type: @@ -58,17 +77,7 @@ def add_extra_output_topic(self, topic_name: str, role: str) -> None: def helm_chart(self) -> str: return f"{self.repo_config.repository_name}/{AppType.PRODUCER_APP.value}" - @property - @override - def clean_up_helm_chart(self) -> str: - return ( - f"{self.repo_config.repository_name}/{AppType.CLEANUP_PRODUCER_APP.value}" - ) - @override def clean(self, dry_run: bool) -> None: - self._run_clean_up_job( - values=self.to_helm_values(), - dry_run=dry_run, - retain_clean_jobs=self.config.retain_clean_jobs, - ) + self._cleaner.app.streams.delete_output = True # TODO: sensible? + self._cleaner.run(dry_run) diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index e8a434b70..10752f444 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -1,12 +1,21 @@ from pydantic import Field from typing_extensions import override -from kpops.components.base_components.kafka_app import KafkaApp +from kpops.components.base_components.kafka_app import KafkaApp, KafkaAppCleaner from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.streams.model import StreamsAppValues from kpops.utils.docstring import describe_attr +class StreamsAppCleaner(KafkaAppCleaner): + app: StreamsAppValues + + @property + @override + def helm_chart(self) -> str: + return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}" + + class StreamsApp(KafkaApp): """StreamsApp component that configures a streams bootstrap app. @@ -18,6 +27,16 @@ class StreamsApp(KafkaApp): description=describe_attr("app", __doc__), ) + @property + def _cleaner(self) -> StreamsAppCleaner: + return StreamsAppCleaner( + config=self.config, + handlers=self.handlers, + name=self.name, + namespace=self.namespace, + app=self.app, + ) + @override def add_input_topics(self, topics: list[str]) -> None: self.app.streams.add_input_topics(topics) @@ -51,29 +70,12 @@ def add_extra_output_topic(self, topic_name: str, role: str) -> None: def helm_chart(self) -> str: return f"{self.repo_config.repository_name}/{AppType.STREAMS_APP.value}" - @property - @override - def clean_up_helm_chart(self) -> str: - return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}" - @override def reset(self, dry_run: bool) -> None: - self.__run_streams_clean_up_job(dry_run, delete_output=False) + self._cleaner.app.streams.delete_output = False + self._cleaner.run(dry_run) @override def clean(self, dry_run: bool) -> None: - self.__run_streams_clean_up_job(dry_run, delete_output=True) - - def __run_streams_clean_up_job(self, dry_run: bool, delete_output: bool) -> None: - """Run clean job for this Streams app. - - :param dry_run: Whether to do a dry run of the command - :param delete_output: Whether to delete the output of the app that is being cleaned - """ - values = self.to_helm_values() - values["streams"]["deleteOutput"] = delete_output - self._run_clean_up_job( - values=values, - dry_run=dry_run, - retain_clean_jobs=self.config.retain_clean_jobs, - ) + self._cleaner.app.streams.delete_output = True + self._cleaner.run(dry_run) diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index ebd5cf7d9..429c6490a 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -371,7 +371,7 @@ def test_reset_when_dry_run_is_false( self, streams_app: StreamsApp, mocker: MockerFixture ): mock_helm_upgrade_install = mocker.patch.object( - streams_app.helm, "upgrade_install" + streams_app._cleaner.helm, "upgrade_install" ) mock_helm_uninstall = mocker.patch.object(streams_app.helm, "uninstall") From 0578cb06e060fa89b3112314545ee040fdd273c4 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 17:48:03 +0100 Subject: [PATCH 08/27] Update tests --- docs/docs/schema/pipeline.json | 13 ++ kpops/components/base_components/kafka_app.py | 10 +- .../producer/producer_app.py | 9 +- .../streams_bootstrap/streams/model.py | 4 + .../streams_bootstrap/streams/streams_app.py | 8 +- tests/components/test_producer_app.py | 140 ++++++++++-------- tests/components/test_streams_app.py | 138 +++++++++-------- 7 files changed, 187 insertions(+), 135 deletions(-) diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 77198a215..2a9c565d0 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -920,6 +920,19 @@ "title": "Config", "type": "object" }, + "deleteOutput": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Whether the output topics with their associated schemas and the consumer group should be deleted during the cleanup", + "title": "Deleteoutput" + }, "errorTopic": { "anyOf": [ { diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index e4e376fe2..630ab4d40 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -94,6 +94,8 @@ def run(self, dry_run: bool) -> None: log.info(f"Uninstall cleanup job for {self.helm_release_name}") self.destroy(dry_run=dry_run) + # def factory # TODO? + class KafkaApp(HelmApp, ABC): """Base component for Kafka-based components. @@ -125,13 +127,7 @@ class KafkaApp(HelmApp, ABC): @property def _cleaner(self) -> KafkaAppCleaner: - return KafkaAppCleaner( - config=self.config, - handlers=self.handlers, - name=self.name, - namespace=self.namespace, - app=self.app, - ) + raise NotImplementedError @override def deploy(self, dry_run: bool) -> None: diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index b92479e4c..31aad8938 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -1,3 +1,5 @@ +from functools import cached_property + from pydantic import Field from typing_extensions import override @@ -45,14 +47,12 @@ class ProducerApp(KafkaApp): description=describe_attr("from_", __doc__), ) - @property + @cached_property def _cleaner(self) -> ProducerAppCleaner: return ProducerAppCleaner( config=self.config, handlers=self.handlers, - name=self.name, - namespace=self.namespace, - app=self.app, + **self.model_dump(), ) @override @@ -79,5 +79,4 @@ def helm_chart(self) -> str: @override def clean(self, dry_run: bool) -> None: - self._cleaner.app.streams.delete_output = True # TODO: sensible? self._cleaner.run(dry_run) diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index b52bc162c..a162365fe 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -28,6 +28,7 @@ class StreamsConfig(KafkaStreamsConfig): :param output_topic: Output topic, defaults to None :param error_topic: Error topic, defaults to None :param config: Configuration, defaults to {} + :param delete_output: Whether the output topics with their associated schemas and the consumer group should be deleted during the cleanup, defaults to False """ input_topics: list[str] = Field( @@ -54,6 +55,9 @@ class StreamsConfig(KafkaStreamsConfig): config: dict[str, Any] = Field( default={}, description=describe_attr("config", __doc__) ) + delete_output: bool | None = Field( + default=None, description=describe_attr("delete_output", __doc__) + ) def add_input_topics(self, topics: list[str]) -> None: """Add given topics to the list of input topics. diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 10752f444..d0aadd294 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -1,3 +1,5 @@ +from functools import cached_property + from pydantic import Field from typing_extensions import override @@ -27,14 +29,12 @@ class StreamsApp(KafkaApp): description=describe_attr("app", __doc__), ) - @property + @cached_property def _cleaner(self) -> StreamsAppCleaner: return StreamsAppCleaner( config=self.config, handlers=self.handlers, - name=self.name, - namespace=self.namespace, - app=self.app, + **self.model_dump(), ) @override diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 07e78bb6a..b5de6b67b 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -168,11 +168,13 @@ def test_should_not_reset_producer_app( mocker: MockerFixture, ): mock_helm_upgrade_install = mocker.patch.object( - producer_app.helm, "upgrade_install" + producer_app._cleaner.helm, "upgrade_install" + ) + mock_helm_uninstall = mocker.patch.object( + producer_app._cleaner.helm, "uninstall" ) - mock_helm_uninstall = mocker.patch.object(producer_app.helm, "uninstall") mock_helm_print_helm_diff = mocker.patch.object( - producer_app.dry_run_handler, "print_helm_diff" + producer_app._cleaner.dry_run_handler, "print_helm_diff" ) mock = mocker.MagicMock() @@ -182,45 +184,55 @@ def test_should_not_reset_producer_app( producer_app.clean(dry_run=True) - assert mock.mock_calls == [ - mocker.call.helm_uninstall( - "test-namespace", - PRODUCER_APP_CLEAN_RELEASE_NAME, - True, - ), - mocker.call.helm_upgrade_install( - PRODUCER_APP_CLEAN_RELEASE_NAME, - "bakdata-streams-bootstrap/producer-app-cleanup-job", - True, - "test-namespace", - { - "nameOverride": PRODUCER_APP_FULL_NAME, - "streams": { - "brokers": "fake-broker:9092", - "outputTopic": "${output_topic_name}", + mock.assert_has_calls( + [ + mocker.call.helm_uninstall( + "test-namespace", + PRODUCER_APP_CLEAN_RELEASE_NAME, + True, + ), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm_upgrade_install( + PRODUCER_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/producer-app-cleanup-job", + True, + "test-namespace", + { + "nameOverride": PRODUCER_APP_FULL_NAME, + "streams": { + "brokers": "fake-broker:9092", + "outputTopic": "${output_topic_name}", + }, }, - }, - HelmUpgradeInstallFlags(version="2.4.2", wait=True, wait_for_jobs=True), - ), - mocker.call.print_helm_diff( - ANY, - PRODUCER_APP_CLEAN_RELEASE_NAME, - logging.getLogger("KafkaApp"), - ), - mocker.call.helm_uninstall( - "test-namespace", - PRODUCER_APP_CLEAN_RELEASE_NAME, - True, - ), - ] + HelmUpgradeInstallFlags( + version="2.4.2", wait=True, wait_for_jobs=True + ), + ), + mocker.call.print_helm_diff( + ANY, + PRODUCER_APP_CLEAN_RELEASE_NAME, + logging.getLogger("HelmApp"), + ), + mocker.call.helm_uninstall( + "test-namespace", + PRODUCER_APP_CLEAN_RELEASE_NAME, + True, + ), + ANY, # __bool__ + ANY, # __str__ + ] + ) def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_with_dry_run_false( self, mocker: MockerFixture, producer_app: ProducerApp ): mock_helm_upgrade_install = mocker.patch.object( - producer_app.helm, "upgrade_install" + producer_app._cleaner.helm, "upgrade_install" + ) + mock_helm_uninstall = mocker.patch.object( + producer_app._cleaner.helm, "uninstall" ) - mock_helm_uninstall = mocker.patch.object(producer_app.helm, "uninstall") mock = mocker.MagicMock() mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") @@ -228,29 +240,37 @@ def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_w producer_app.clean(dry_run=False) - assert mock.mock_calls == [ - mocker.call.helm_uninstall( - "test-namespace", - PRODUCER_APP_CLEAN_RELEASE_NAME, - False, - ), - mocker.call.helm_upgrade_install( - PRODUCER_APP_CLEAN_RELEASE_NAME, - "bakdata-streams-bootstrap/producer-app-cleanup-job", - False, - "test-namespace", - { - "nameOverride": PRODUCER_APP_FULL_NAME, - "streams": { - "brokers": "fake-broker:9092", - "outputTopic": "${output_topic_name}", + mock.assert_has_calls( + [ + mocker.call.helm_uninstall( + "test-namespace", + PRODUCER_APP_CLEAN_RELEASE_NAME, + False, + ), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm_upgrade_install( + PRODUCER_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/producer-app-cleanup-job", + False, + "test-namespace", + { + "nameOverride": PRODUCER_APP_FULL_NAME, + "streams": { + "brokers": "fake-broker:9092", + "outputTopic": "${output_topic_name}", + }, }, - }, - HelmUpgradeInstallFlags(version="2.4.2", wait=True, wait_for_jobs=True), - ), - mocker.call.helm_uninstall( - "test-namespace", - PRODUCER_APP_CLEAN_RELEASE_NAME, - False, - ), - ] + HelmUpgradeInstallFlags( + version="2.4.2", wait=True, wait_for_jobs=True + ), + ), + mocker.call.helm_uninstall( + "test-namespace", + PRODUCER_APP_CLEAN_RELEASE_NAME, + False, + ), + ANY, # __bool__ + ANY, # __str__ + ] + ) diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 429c6490a..1bdb8631d 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -1,5 +1,5 @@ from pathlib import Path -from unittest.mock import MagicMock +from unittest.mock import ANY, MagicMock import pytest from pytest_mock import MockerFixture @@ -17,6 +17,7 @@ TopicConfig, ToSection, ) +from kpops.components.streams_bootstrap.streams.streams_app import StreamsAppCleaner from kpops.config import KpopsConfig, TopicNameConfig DEFAULTS_PATH = Path(__file__).parent / "resources" @@ -370,10 +371,11 @@ def test_destroy(self, streams_app: StreamsApp, mocker: MockerFixture): def test_reset_when_dry_run_is_false( self, streams_app: StreamsApp, mocker: MockerFixture ): - mock_helm_upgrade_install = mocker.patch.object( - streams_app._cleaner.helm, "upgrade_install" - ) - mock_helm_uninstall = mocker.patch.object(streams_app.helm, "uninstall") + cleaner = streams_app._cleaner + assert isinstance(cleaner, StreamsAppCleaner) + + mock_helm_upgrade_install = mocker.patch.object(cleaner.helm, "upgrade_install") + mock_helm_uninstall = mocker.patch.object(cleaner.helm, "uninstall") mock = mocker.MagicMock() mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") @@ -382,33 +384,41 @@ def test_reset_when_dry_run_is_false( dry_run = False streams_app.reset(dry_run=dry_run) - assert mock.mock_calls == [ - mocker.call.helm_uninstall( - "test-namespace", - STREAMS_APP_CLEAN_RELEASE_NAME, - dry_run, - ), - mocker.call.helm_upgrade_install( - STREAMS_APP_CLEAN_RELEASE_NAME, - "bakdata-streams-bootstrap/streams-app-cleanup-job", - dry_run, - "test-namespace", - { - "nameOverride": STREAMS_APP_FULL_NAME, - "streams": { - "brokers": "fake-broker:9092", - "outputTopic": "${output_topic_name}", - "deleteOutput": False, + mock.assert_has_calls( + [ + mocker.call.helm_uninstall( + "test-namespace", + STREAMS_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ # FIXME: why is this in the call stack? + ANY, # __str__ + mocker.call.helm_upgrade_install( + STREAMS_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/streams-app-cleanup-job", + dry_run, + "test-namespace", + { + "nameOverride": STREAMS_APP_FULL_NAME, + "streams": { + "brokers": "fake-broker:9092", + "outputTopic": "${output_topic_name}", + "deleteOutput": False, + }, }, - }, - HelmUpgradeInstallFlags(version="2.9.0", wait=True, wait_for_jobs=True), - ), - mocker.call.helm_uninstall( - "test-namespace", - STREAMS_APP_CLEAN_RELEASE_NAME, - dry_run, - ), - ] + HelmUpgradeInstallFlags( + version="2.9.0", wait=True, wait_for_jobs=True + ), + ), + mocker.call.helm_uninstall( + "test-namespace", + STREAMS_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + ] + ) def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean_up( self, @@ -416,9 +426,11 @@ def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean_up( mocker: MockerFixture, ): mock_helm_upgrade_install = mocker.patch.object( - streams_app.helm, "upgrade_install" + streams_app._cleaner.helm, "upgrade_install" + ) + mock_helm_uninstall = mocker.patch.object( + streams_app._cleaner.helm, "uninstall" ) - mock_helm_uninstall = mocker.patch.object(streams_app.helm, "uninstall") mock = mocker.MagicMock() mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") @@ -427,30 +439,38 @@ def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean_up( dry_run = False streams_app.clean(dry_run=dry_run) - assert mock.mock_calls == [ - mocker.call.helm_uninstall( - "test-namespace", - STREAMS_APP_CLEAN_RELEASE_NAME, - dry_run, - ), - mocker.call.helm_upgrade_install( - STREAMS_APP_CLEAN_RELEASE_NAME, - "bakdata-streams-bootstrap/streams-app-cleanup-job", - dry_run, - "test-namespace", - { - "nameOverride": STREAMS_APP_FULL_NAME, - "streams": { - "brokers": "fake-broker:9092", - "outputTopic": "${output_topic_name}", - "deleteOutput": True, + mock.assert_has_calls( + [ + mocker.call.helm_uninstall( + "test-namespace", + STREAMS_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm_upgrade_install( + STREAMS_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/streams-app-cleanup-job", + dry_run, + "test-namespace", + { + "nameOverride": STREAMS_APP_FULL_NAME, + "streams": { + "brokers": "fake-broker:9092", + "outputTopic": "${output_topic_name}", + "deleteOutput": True, + }, }, - }, - HelmUpgradeInstallFlags(version="2.9.0", wait=True, wait_for_jobs=True), - ), - mocker.call.helm_uninstall( - "test-namespace", - STREAMS_APP_CLEAN_RELEASE_NAME, - dry_run, - ), - ] + HelmUpgradeInstallFlags( + version="2.9.0", wait=True, wait_for_jobs=True + ), + ), + mocker.call.helm_uninstall( + "test-namespace", + STREAMS_APP_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + ] + ) From 44195e74f738dc56571da49d9f167028443d5fc3 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 17:56:58 +0100 Subject: [PATCH 09/27] Inherit from common streams-bootstrap app --- kpops/components/base_components/kafka_app.py | 20 ++----------------- .../producer/producer_app.py | 5 ++++- .../streams_bootstrap/streams/streams_app.py | 5 ++++- 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index 630ab4d40..5eb8feada 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -47,7 +47,7 @@ class KafkaAppValues(HelmAppValues): ) -class StreamsBootstrapHelmApp(HelmApp): +class StreamsBootstrapHelmApp(HelmApp, ABC): repo_config: HelmRepoConfig = Field( default=HelmRepoConfig( repository_name="bakdata-streams-bootstrap", @@ -94,18 +94,13 @@ def run(self, dry_run: bool) -> None: log.info(f"Uninstall cleanup job for {self.helm_release_name}") self.destroy(dry_run=dry_run) - # def factory # TODO? - -class KafkaApp(HelmApp, ABC): +class KafkaApp(StreamsBootstrapHelmApp, ABC): """Base component for Kafka-based components. Producer or streaming apps should inherit from this class. :param app: Application-specific settings - :param repo_config: Configuration of the Helm chart repo to be used for - deploying the component, - defaults to HelmRepoConfig(repository_name="bakdata-streams-bootstrap", url="https://bakdata.github.io/streams-bootstrap/") :param version: Helm chart version, defaults to "2.9.0" """ @@ -113,22 +108,11 @@ class KafkaApp(HelmApp, ABC): default=..., description=describe_attr("app", __doc__), ) - repo_config: HelmRepoConfig = Field( - default=HelmRepoConfig( - repository_name="bakdata-streams-bootstrap", - url="https://bakdata.github.io/streams-bootstrap/", - ), - description=describe_attr("repo_config", __doc__), - ) version: str | None = Field( default="2.9.0", description=describe_attr("version", __doc__), ) - @property - def _cleaner(self) -> KafkaAppCleaner: - raise NotImplementedError - @override def deploy(self, dry_run: bool) -> None: if self.to: diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 31aad8938..465e9fa24 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -3,7 +3,10 @@ from pydantic import Field from typing_extensions import override -from kpops.components.base_components.kafka_app import KafkaApp, KafkaAppCleaner +from kpops.components.base_components.kafka_app import ( + KafkaApp, + KafkaAppCleaner, +) from kpops.components.base_components.models.to_section import ( OutputTopicTypes, TopicConfig, diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index d0aadd294..deedbd137 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -3,7 +3,10 @@ from pydantic import Field from typing_extensions import override -from kpops.components.base_components.kafka_app import KafkaApp, KafkaAppCleaner +from kpops.components.base_components.kafka_app import ( + KafkaApp, + KafkaAppCleaner, +) from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.streams.model import StreamsAppValues from kpops.utils.docstring import describe_attr From a2a8418d0165ec34f7f9a7a82526e02dd0fed671 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 18:01:34 +0100 Subject: [PATCH 10/27] Remove desc --- docs/docs/schema/pipeline.json | 6 ++---- kpops/components/base_components/kafka_app.py | 3 +-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 2a9c565d0..1d15880cb 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -500,8 +500,7 @@ }, "repository_name": "bakdata-streams-bootstrap", "url": "https://bakdata.github.io/streams-bootstrap/" - }, - "description": "Configuration of the Helm chart repo to be used for deploying the component" + } }, "to": { "anyOf": [ @@ -744,8 +743,7 @@ }, "repository_name": "bakdata-streams-bootstrap", "url": "https://bakdata.github.io/streams-bootstrap/" - }, - "description": "Configuration of the Helm chart repo to be used for deploying the component" + } }, "to": { "anyOf": [ diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index 5eb8feada..ca7eb28a3 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -52,8 +52,7 @@ class StreamsBootstrapHelmApp(HelmApp, ABC): default=HelmRepoConfig( repository_name="bakdata-streams-bootstrap", url="https://bakdata.github.io/streams-bootstrap/", - ), - description=describe_attr("repo_config", __doc__), + ) ) From 3457d1f26c497faff560c938d1837b8e393359a4 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 18:22:42 +0100 Subject: [PATCH 11/27] Fix failing hooks --- docs/docs/schema/pipeline.json | 6 +++-- kpops/components/base_components/kafka_app.py | 24 +++++++++++-------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 1d15880cb..2a9c565d0 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -500,7 +500,8 @@ }, "repository_name": "bakdata-streams-bootstrap", "url": "https://bakdata.github.io/streams-bootstrap/" - } + }, + "description": "Configuration of the Helm chart repo to be used for deploying the component" }, "to": { "anyOf": [ @@ -743,7 +744,8 @@ }, "repository_name": "bakdata-streams-bootstrap", "url": "https://bakdata.github.io/streams-bootstrap/" - } + }, + "description": "Configuration of the Helm chart repo to be used for deploying the component" }, "to": { "anyOf": [ diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index ca7eb28a3..584f57959 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -18,6 +18,11 @@ log = logging.getLogger("KafkaApp") +STREAMS_BOOTSTRAP_HELM_REPO = HelmRepoConfig( + repository_name="bakdata-streams-bootstrap", + url="https://bakdata.github.io/streams-bootstrap/", +) + class KafkaStreamsConfig(CamelCaseConfigModel, DescConfigModel): """Kafka Streams config. @@ -47,16 +52,9 @@ class KafkaAppValues(HelmAppValues): ) -class StreamsBootstrapHelmApp(HelmApp, ABC): - repo_config: HelmRepoConfig = Field( - default=HelmRepoConfig( - repository_name="bakdata-streams-bootstrap", - url="https://bakdata.github.io/streams-bootstrap/", - ) - ) - +class KafkaAppCleaner(HelmApp): + repo_config: HelmRepoConfig = Field(default=STREAMS_BOOTSTRAP_HELM_REPO) -class KafkaAppCleaner(StreamsBootstrapHelmApp): @property @override def helm_chart(self) -> str: @@ -94,12 +92,14 @@ def run(self, dry_run: bool) -> None: self.destroy(dry_run=dry_run) -class KafkaApp(StreamsBootstrapHelmApp, ABC): +class KafkaApp(HelmApp, ABC): """Base component for Kafka-based components. Producer or streaming apps should inherit from this class. :param app: Application-specific settings + :param repo_config: Configuration of the Helm chart repo to be used for + deploying the component, defaults to streams-bootstrap Helm repo :param version: Helm chart version, defaults to "2.9.0" """ @@ -107,6 +107,10 @@ class KafkaApp(StreamsBootstrapHelmApp, ABC): default=..., description=describe_attr("app", __doc__), ) + repo_config: HelmRepoConfig = Field( + default=STREAMS_BOOTSTRAP_HELM_REPO, + description=describe_attr("repo_config", __doc__), + ) version: str | None = Field( default="2.9.0", description=describe_attr("version", __doc__), From a725f030d5170b2fbcb6b1cde487116b4d9ce976 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 18:33:00 +0100 Subject: [PATCH 12/27] Cosmetic --- kpops/components/base_components/kafka_app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index 584f57959..9ea6da342 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -53,7 +53,7 @@ class KafkaAppValues(HelmAppValues): class KafkaAppCleaner(HelmApp): - repo_config: HelmRepoConfig = Field(default=STREAMS_BOOTSTRAP_HELM_REPO) + repo_config: HelmRepoConfig = STREAMS_BOOTSTRAP_HELM_REPO @property @override From 26ac7f9297538b6dacd05d0a26e10d285c06acad Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 18:33:52 +0100 Subject: [PATCH 13/27] Cosmetic --- kpops/components/base_components/kafka_app.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index 9ea6da342..f4a205080 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -82,14 +82,14 @@ def run(self, dry_run: bool) -> None: :param dry_run: Dry run command """ log.info(f"Uninstall old cleanup job for {self.helm_release_name}") - self.destroy(dry_run=dry_run) + self.destroy(dry_run) log.info(f"Init cleanup job for {self.helm_release_name}") - self.deploy(dry_run=dry_run) + self.deploy(dry_run) if not self.config.retain_clean_jobs: log.info(f"Uninstall cleanup job for {self.helm_release_name}") - self.destroy(dry_run=dry_run) + self.destroy(dry_run) class KafkaApp(HelmApp, ABC): From b193c039e93da317b9c6c75afb77a1aee42231c2 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Tue, 2 Jan 2024 18:36:50 +0100 Subject: [PATCH 14/27] Rename run to clean --- kpops/components/base_components/kafka_app.py | 5 +++-- kpops/components/streams_bootstrap/producer/producer_app.py | 2 +- kpops/components/streams_bootstrap/streams/streams_app.py | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index f4a205080..4edddd0ef 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -76,8 +76,9 @@ def helm_flags(self) -> HelmFlags: wait_for_jobs=True, ) - def run(self, dry_run: bool) -> None: - """Clean an app using the respective cleanup job. + @override + def clean(self, dry_run: bool) -> None: + """Clean an app using a cleanup job. :param dry_run: Dry run command """ diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 465e9fa24..eca7c4395 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -82,4 +82,4 @@ def helm_chart(self) -> str: @override def clean(self, dry_run: bool) -> None: - self._cleaner.run(dry_run) + self._cleaner.clean(dry_run) diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index deedbd137..41693f083 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -76,9 +76,9 @@ def helm_chart(self) -> str: @override def reset(self, dry_run: bool) -> None: self._cleaner.app.streams.delete_output = False - self._cleaner.run(dry_run) + self._cleaner.clean(dry_run) @override def clean(self, dry_run: bool) -> None: self._cleaner.app.streams.delete_output = True - self._cleaner.run(dry_run) + self._cleaner.clean(dry_run) From 9d27adc1aac71642404a6b7eca577627a151fd30 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 3 Jan 2024 09:59:15 +0100 Subject: [PATCH 15/27] Fix returned object type --- kpops/components/base_components/kafka_app.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index 4edddd0ef..a0a8784b5 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -9,7 +9,6 @@ from kpops.component_handlers.helm_wrapper.model import ( HelmFlags, HelmRepoConfig, - HelmUpgradeInstallFlags, ) from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name from kpops.components.base_components.helm_app import HelmApp, HelmAppValues @@ -69,7 +68,7 @@ def helm_release_name(self) -> str: @property @override def helm_flags(self) -> HelmFlags: - return HelmUpgradeInstallFlags( + return HelmFlags( create_namespace=self.config.create_namespace, version=self.version, wait=True, From c517a3b89301ea886cc57a191d3b5b8f3da62db1 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 3 Jan 2024 10:08:08 +0100 Subject: [PATCH 16/27] Add pydocs and todo --- kpops/components/base_components/kafka_app.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index a0a8784b5..8cc1c5c7e 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -52,7 +52,10 @@ class KafkaAppValues(HelmAppValues): class KafkaAppCleaner(HelmApp): + """Helm app for resetting and cleaning a streams-bootstrap app.""" + repo_config: HelmRepoConfig = STREAMS_BOOTSTRAP_HELM_REPO + # TODO: streams-bootstrap version? @property @override From 11dcc1e6d048a95912b33546528623ccfc9510a0 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 3 Jan 2024 16:45:29 +0100 Subject: [PATCH 17/27] Create streams-bootstrap base for all components based on its Helm charts --- ...aults_pipeline_component_dependencies.yaml | 4 +- .../dependencies/kpops_structure.yaml | 21 ++++++--- .../pipeline_component_dependencies.yaml | 11 ++--- .../pipeline-components/kafka-app.yaml | 12 ------ .../pipeline-components/pipeline.yaml | 20 ++------- .../pipeline-components/producer-app.yaml | 4 +- .../pipeline-components/streams-app.yaml | 4 +- .../pipeline-defaults/defaults-kafka-app.yaml | 43 ++++++++++++++++++- .../resources/pipeline-defaults/defaults.yaml | 43 ++++++++++++++++++- docs/docs/schema/pipeline.json | 18 ++++---- kpops/components/__init__.py | 5 ++- kpops/components/base_components/kafka_app.py | 28 +++--------- .../components/streams_bootstrap/__init__.py | 36 +++++++++++++--- .../producer/producer_app.py | 7 +-- .../streams_bootstrap/streams/model.py | 6 +-- .../streams_bootstrap/streams/streams_app.py | 5 ++- tests/cli/test_registry.py | 3 +- ...kafka_app.py => test_streams_bootstrap.py} | 35 +++++++-------- tests/pipeline/test_components/components.py | 3 +- 19 files changed, 189 insertions(+), 119 deletions(-) rename tests/components/{test_kafka_app.py => test_streams_bootstrap.py} (75%) diff --git a/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml b/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml index 4e12885af..959596df0 100644 --- a/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml @@ -2,8 +2,10 @@ helm-app.yaml: - app-helm-app.yaml - repo_config-helm-app.yaml kafka-app.yaml: +- prefix.yaml +- from_.yaml +- to.yaml - app-kafka-app.yaml -- version-kafka-app.yaml kafka-connector.yaml: - prefix.yaml - from_.yaml diff --git a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml index 70dc43870..0d553845b 100644 --- a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml @@ -13,10 +13,7 @@ kpops_components_fields: - prefix - from_ - to - - namespace - app - - repo_config - - version kafka-connector: - name - prefix @@ -65,8 +62,8 @@ kpops_components_fields: - prefix - from_ - to - - namespace - app + - namespace - repo_config - version streams-app: @@ -74,17 +71,27 @@ kpops_components_fields: - prefix - from_ - to + - app + - namespace + - repo_config + - version + streams-bootstrap: + - name + - prefix + - from_ + - to - namespace - app - repo_config - version kpops_components_inheritance_ref: helm-app: kubernetes-app - kafka-app: helm-app + kafka-app: pipeline-component kafka-connector: pipeline-component kafka-sink-connector: kafka-connector kafka-source-connector: kafka-connector kubernetes-app: pipeline-component pipeline-component: base-defaults-component - producer-app: kafka-app - streams-app: kafka-app + producer-app: streams-bootstrap + streams-app: streams-bootstrap + streams-bootstrap: helm-app diff --git a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml index 8504a0135..1127dda94 100644 --- a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml @@ -10,10 +10,7 @@ kafka-app.yaml: - prefix.yaml - from_.yaml - to.yaml -- namespace.yaml - app-kafka-app.yaml -- repo_config-helm-app.yaml -- version-kafka-app.yaml kafka-connector.yaml: - prefix.yaml - from_.yaml @@ -52,15 +49,15 @@ producer-app.yaml: - prefix.yaml - from_-producer-app.yaml - to.yaml -- namespace.yaml - app-producer-app.yaml +- namespace.yaml - repo_config-helm-app.yaml -- version-kafka-app.yaml +- version.yaml streams-app.yaml: - prefix.yaml - from_.yaml - to.yaml -- namespace.yaml - app-streams-app.yaml +- namespace.yaml - repo_config-helm-app.yaml -- version-kafka-app.yaml +- version.yaml diff --git a/docs/docs/resources/pipeline-components/kafka-app.yaml b/docs/docs/resources/pipeline-components/kafka-app.yaml index cdc49ef28..ff2b5500c 100644 --- a/docs/docs/resources/pipeline-components/kafka-app.yaml +++ b/docs/docs/resources/pipeline-components/kafka-app.yaml @@ -44,7 +44,6 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # `app` can contain application-specific settings, hence the user is free to # add the key-value pairs they need. app: # required @@ -53,14 +52,3 @@ schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app - # Helm repository configuration (optional) - # If not set the helm repo add will not be called. Useful when using local Helm charts - repo_config: - repository_name: bakdata-streams-bootstrap # required - url: https://bakdata.github.io/streams-bootstrap/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "2.12.0" # Helm chart version diff --git a/docs/docs/resources/pipeline-components/pipeline.yaml b/docs/docs/resources/pipeline-components/pipeline.yaml index 1c6350fbc..52245a05b 100644 --- a/docs/docs/resources/pipeline-components/pipeline.yaml +++ b/docs/docs/resources/pipeline-components/pipeline.yaml @@ -107,7 +107,6 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # `app` can contain application-specific settings, hence the user is free to # add the key-value pairs they need. app: # required @@ -116,17 +115,6 @@ schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app - # Helm repository configuration (optional) - # If not set the helm repo add will not be called. Useful when using local Helm charts - repo_config: - repository_name: bakdata-streams-bootstrap # required - url: https://bakdata.github.io/streams-bootstrap/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "2.12.0" # Helm chart version # Kafka sink connector - type: kafka-sink-connector name: kafka-sink-connector # required @@ -322,7 +310,6 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # Allowed configs: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required @@ -334,6 +321,7 @@ output_role1: output_topic1 output_role2: output_topic2 nameOverride: override-with-this-name # kafka-app-specific + namespace: namespace # required # Helm repository configuration (optional) # If not set the helm repo add will not be called. Useful when using local Helm charts repo_config: @@ -344,7 +332,7 @@ password: pass ca_file: /home/user/path/to/ca-file insecure_skip_tls_verify: false - version: "2.12.0" # Helm chart version + version: "1.0.0" # Helm chart version # StreamsApp component that configures a streams bootstrap app. # More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap - type: streams-app # required @@ -391,7 +379,6 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # No arbitrary keys are allowed under `app`here # Allowed configs: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app @@ -448,6 +435,7 @@ topics: # List of auto-generated Kafka Streams topics used by the streams app. - topic1 - topic2 + namespace: namespace # required # Helm repository configuration (optional) # If not set the helm repo add will not be called. Useful when using local Helm charts repo_config: @@ -458,4 +446,4 @@ password: pass ca_file: /home/user/path/to/ca-file insecure_skip_tls_verify: false - version: "2.12.0" # Helm chart version + version: "1.0.0" # Helm chart version diff --git a/docs/docs/resources/pipeline-components/producer-app.yaml b/docs/docs/resources/pipeline-components/producer-app.yaml index 5be3551d8..9a698e1b3 100644 --- a/docs/docs/resources/pipeline-components/producer-app.yaml +++ b/docs/docs/resources/pipeline-components/producer-app.yaml @@ -27,7 +27,6 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # Allowed configs: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required @@ -39,6 +38,7 @@ output_role1: output_topic1 output_role2: output_topic2 nameOverride: override-with-this-name # kafka-app-specific + namespace: namespace # required # Helm repository configuration (optional) # If not set the helm repo add will not be called. Useful when using local Helm charts repo_config: @@ -49,4 +49,4 @@ password: pass ca_file: /home/user/path/to/ca-file insecure_skip_tls_verify: false - version: "2.12.0" # Helm chart version + version: "1.0.0" # Helm chart version diff --git a/docs/docs/resources/pipeline-components/streams-app.yaml b/docs/docs/resources/pipeline-components/streams-app.yaml index f77edf80c..c333631ef 100644 --- a/docs/docs/resources/pipeline-components/streams-app.yaml +++ b/docs/docs/resources/pipeline-components/streams-app.yaml @@ -44,7 +44,6 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # No arbitrary keys are allowed under `app`here # Allowed configs: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app @@ -101,6 +100,7 @@ topics: # List of auto-generated Kafka Streams topics used by the streams app. - topic1 - topic2 + namespace: namespace # required # Helm repository configuration (optional) # If not set the helm repo add will not be called. Useful when using local Helm charts repo_config: @@ -111,4 +111,4 @@ password: pass ca_file: /home/user/path/to/ca-file insecure_skip_tls_verify: false - version: "2.12.0" # Helm chart version + version: "1.0.0" # Helm chart version diff --git a/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml b/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml index bd6c9e2d9..d37dad1bb 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml @@ -3,6 +3,48 @@ # Parent of: ProducerApp, StreamsApp # Child of: KubernetesApp kafka-app: + # Pipeline prefix that will prefix every component name. If you wish to not + # have any prefix you can specify an empty string. + prefix: ${pipeline_name}- + from: # Must not be null + topics: # read from topic + ${pipeline_name}-input-topic: + type: input # Implied when role is NOT specified + ${pipeline_name}-extra-topic: + role: topic-role # Implies `type` to be extra + ${pipeline_name}-input-pattern-topic: + type: pattern # Implied to be an input pattern if `role` is undefined + ${pipeline_name}-extra-pattern-topic: + type: pattern # Implied to be an extra pattern if `role` is defined + role: some-role + components: # read from specific component + account-producer: + type: output # Implied when role is NOT specified + other-producer: + role: some-role # Implies `type` to be extra + component-as-input-pattern: + type: pattern # Implied to be an input pattern if `role` is undefined + component-as-extra-pattern: + type: pattern # Implied to be an extra pattern if `role` is defined + role: some-role + # Topic(s) into which the component will write output + to: + topics: + ${pipeline_name}-output-topic: + type: output # Implied when role is NOT specified + ${pipeline_name}-extra-topic: + role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined + ${pipeline_name}-error-topic: + type: error + # Currently KPOps supports Avro and JSON schemas. + key_schema: key-schema # must implement SchemaProvider to use + value_schema: value-schema + partitions_count: 1 + replication_factor: 1 + configs: # https://kafka.apache.org/documentation/#topicconfigs + cleanup.policy: compact + models: # SchemaProvider is initiated with the values given here + model: model # `app` can contain application-specific settings, hence the user is free to # add the key-value pairs they need. app: # required @@ -11,4 +53,3 @@ kafka-app: schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app - version: "2.12.0" # Helm chart version diff --git a/docs/docs/resources/pipeline-defaults/defaults.yaml b/docs/docs/resources/pipeline-defaults/defaults.yaml index 58b22d3f3..5c71248c2 100644 --- a/docs/docs/resources/pipeline-defaults/defaults.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults.yaml @@ -24,6 +24,48 @@ helm-app: # Parent of: ProducerApp, StreamsApp # Child of: KubernetesApp kafka-app: + # Pipeline prefix that will prefix every component name. If you wish to not + # have any prefix you can specify an empty string. + prefix: ${pipeline_name}- + from: # Must not be null + topics: # read from topic + ${pipeline_name}-input-topic: + type: input # Implied when role is NOT specified + ${pipeline_name}-extra-topic: + role: topic-role # Implies `type` to be extra + ${pipeline_name}-input-pattern-topic: + type: pattern # Implied to be an input pattern if `role` is undefined + ${pipeline_name}-extra-pattern-topic: + type: pattern # Implied to be an extra pattern if `role` is defined + role: some-role + components: # read from specific component + account-producer: + type: output # Implied when role is NOT specified + other-producer: + role: some-role # Implies `type` to be extra + component-as-input-pattern: + type: pattern # Implied to be an input pattern if `role` is undefined + component-as-extra-pattern: + type: pattern # Implied to be an extra pattern if `role` is defined + role: some-role + # Topic(s) into which the component will write output + to: + topics: + ${pipeline_name}-output-topic: + type: output # Implied when role is NOT specified + ${pipeline_name}-extra-topic: + role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined + ${pipeline_name}-error-topic: + type: error + # Currently KPOps supports Avro and JSON schemas. + key_schema: key-schema # must implement SchemaProvider to use + value_schema: value-schema + partitions_count: 1 + replication_factor: 1 + configs: # https://kafka.apache.org/documentation/#topicconfigs + cleanup.policy: compact + models: # SchemaProvider is initiated with the values given here + model: model # `app` can contain application-specific settings, hence the user is free to # add the key-value pairs they need. app: # required @@ -32,7 +74,6 @@ kafka-app: schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app - version: "2.12.0" # Helm chart version # Kafka connector # # Parent of: KafkaSinkConnector, KafkaSourceConnector diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 2a9c565d0..9df7d8342 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -452,7 +452,7 @@ }, "ProducerApp": { "additionalProperties": true, - "description": "Producer component.\nThis producer holds configuration to use as values for the streams bootstrap producer helm chart. Note that the producer does not support error topics.", + "description": "Producer component.\nThis producer holds configuration to use as values for the streams-bootstrap producer Helm chart. Note that the producer does not support error topics.", "properties": { "app": { "allOf": [ @@ -531,8 +531,8 @@ }, "required": [ "name", - "namespace", - "app" + "app", + "namespace" ], "title": "ProducerApp", "type": "object" @@ -689,7 +689,7 @@ }, "StreamsApp": { "additionalProperties": true, - "description": "StreamsApp component that configures a streams bootstrap app.", + "description": "StreamsApp component that configures a streams-bootstrap app.", "properties": { "app": { "allOf": [ @@ -775,8 +775,8 @@ }, "required": [ "name", - "namespace", - "app" + "app", + "namespace" ], "title": "StreamsApp", "type": "object" @@ -863,7 +863,7 @@ }, "StreamsAppValues": { "additionalProperties": true, - "description": "StreamsBoostrap app configurations.\nThe attributes correspond to keys and values that are used as values for the streams bootstrap helm chart.", + "description": "streams-bootstrap app configurations.\nThe attributes correspond to keys and values that are used as values for the streams bootstrap helm chart.", "properties": { "autoscaling": { "anyOf": [ @@ -875,7 +875,7 @@ } ], "default": null, - "description": "Kubernetes Event-driven Autoscaling config" + "description": "Kubernetes event-driven autoscaling config" }, "nameOverride": { "anyOf": [ @@ -896,7 +896,7 @@ "$ref": "#/$defs/StreamsConfig" } ], - "description": "Streams Bootstrap streams section" + "description": "streams-bootstrap streams section" } }, "required": [ diff --git a/kpops/components/__init__.py b/kpops/components/__init__.py index 98e1d3530..dc5fcee9c 100644 --- a/kpops/components/__init__.py +++ b/kpops/components/__init__.py @@ -7,7 +7,9 @@ PipelineComponent, ) from kpops.components.base_components.kafka_connector import KafkaConnector -from kpops.components.streams_bootstrap import ProducerApp, StreamsApp +from kpops.components.streams_bootstrap import StreamsBootstrap +from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp +from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp __all__ = ( "HelmApp", @@ -16,6 +18,7 @@ "KafkaSinkConnector", "KafkaSourceConnector", "KubernetesApp", + "StreamsBootstrap", "ProducerApp", "StreamsApp", "PipelineComponent", diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index 8cc1c5c7e..7ee67b09c 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -8,20 +8,16 @@ from kpops.component_handlers.helm_wrapper.model import ( HelmFlags, - HelmRepoConfig, ) from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components.base_components.helm_app import HelmApp, HelmAppValues +from kpops.components.base_components.helm_app import HelmAppValues +from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.components.streams_bootstrap import StreamsBootstrap from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import CamelCaseConfigModel, DescConfigModel log = logging.getLogger("KafkaApp") -STREAMS_BOOTSTRAP_HELM_REPO = HelmRepoConfig( - repository_name="bakdata-streams-bootstrap", - url="https://bakdata.github.io/streams-bootstrap/", -) - class KafkaStreamsConfig(CamelCaseConfigModel, DescConfigModel): """Kafka Streams config. @@ -51,12 +47,9 @@ class KafkaAppValues(HelmAppValues): ) -class KafkaAppCleaner(HelmApp): +class KafkaAppCleaner(StreamsBootstrap): """Helm app for resetting and cleaning a streams-bootstrap app.""" - repo_config: HelmRepoConfig = STREAMS_BOOTSTRAP_HELM_REPO - # TODO: streams-bootstrap version? - @property @override def helm_chart(self) -> str: @@ -95,29 +88,18 @@ def clean(self, dry_run: bool) -> None: self.destroy(dry_run) -class KafkaApp(HelmApp, ABC): +class KafkaApp(PipelineComponent, ABC): """Base component for Kafka-based components. Producer or streaming apps should inherit from this class. :param app: Application-specific settings - :param repo_config: Configuration of the Helm chart repo to be used for - deploying the component, defaults to streams-bootstrap Helm repo - :param version: Helm chart version, defaults to "2.9.0" """ app: KafkaAppValues = Field( default=..., description=describe_attr("app", __doc__), ) - repo_config: HelmRepoConfig = Field( - default=STREAMS_BOOTSTRAP_HELM_REPO, - description=describe_attr("repo_config", __doc__), - ) - version: str | None = Field( - default="2.9.0", - description=describe_attr("version", __doc__), - ) @override def deploy(self, dry_run: bool) -> None: diff --git a/kpops/components/streams_bootstrap/__init__.py b/kpops/components/streams_bootstrap/__init__.py index 097d85b13..1b02b091b 100644 --- a/kpops/components/streams_bootstrap/__init__.py +++ b/kpops/components/streams_bootstrap/__init__.py @@ -1,7 +1,31 @@ -from kpops.components.streams_bootstrap.producer.producer_app import ProducerApp -from kpops.components.streams_bootstrap.streams.streams_app import StreamsApp +from abc import ABC -__all__ = [ - "ProducerApp", - "StreamsApp", -] +from pydantic import Field + +from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig +from kpops.components.base_components.helm_app import HelmApp +from kpops.utils.docstring import describe_attr + +STREAMS_BOOTSTRAP_HELM_REPO = HelmRepoConfig( + repository_name="bakdata-streams-bootstrap", + url="https://bakdata.github.io/streams-bootstrap/", +) +STREAMS_BOOTSTRAP_VERSION = "2.9.0" + + +class StreamsBootstrap(HelmApp, ABC): + """Base for components with a streams-bootstrap Helm chart. + + :param repo_config: Configuration of the Helm chart repo to be used for + deploying the component, defaults to streams-bootstrap Helm repo + :param version: Helm chart version, defaults to "2.9.0" + """ + + repo_config: HelmRepoConfig = Field( + default=STREAMS_BOOTSTRAP_HELM_REPO, + description=describe_attr("repo_config", __doc__), + ) + version: str | None = Field( + default=STREAMS_BOOTSTRAP_VERSION, + description=describe_attr("version", __doc__), + ) diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index eca7c4395..355c31cd3 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -11,6 +11,7 @@ OutputTopicTypes, TopicConfig, ) +from kpops.components.streams_bootstrap import StreamsBootstrap from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.producer.model import ProducerAppValues from kpops.utils.docstring import describe_attr @@ -27,11 +28,11 @@ def helm_chart(self) -> str: ) -class ProducerApp(KafkaApp): +class ProducerApp(StreamsBootstrap, KafkaApp): """Producer component. - This producer holds configuration to use as values for the streams bootstrap - producer helm chart. + This producer holds configuration to use as values for the streams-bootstrap + producer Helm chart. Note that the producer does not support error topics. diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index a162365fe..0447cf272 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -171,12 +171,12 @@ class StreamsAppAutoScaling(CamelCaseConfigModel, DescConfigModel): class StreamsAppValues(KafkaAppValues): - """StreamsBoostrap app configurations. + """streams-bootstrap app configurations. The attributes correspond to keys and values that are used as values for the streams bootstrap helm chart. - :param streams: Streams Bootstrap streams section - :param autoscaling: Kubernetes Event-driven Autoscaling config, defaults to None + :param streams: streams-bootstrap streams section + :param autoscaling: Kubernetes event-driven autoscaling config, defaults to None """ streams: StreamsConfig = Field( diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 41693f083..beadd574c 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -7,6 +7,7 @@ KafkaApp, KafkaAppCleaner, ) +from kpops.components.streams_bootstrap import StreamsBootstrap from kpops.components.streams_bootstrap.app_type import AppType from kpops.components.streams_bootstrap.streams.model import StreamsAppValues from kpops.utils.docstring import describe_attr @@ -21,8 +22,8 @@ def helm_chart(self) -> str: return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}" -class StreamsApp(KafkaApp): - """StreamsApp component that configures a streams bootstrap app. +class StreamsApp(StreamsBootstrap, KafkaApp): + """StreamsApp component that configures a streams-bootstrap app. :param app: Application-specific settings """ diff --git a/tests/cli/test_registry.py b/tests/cli/test_registry.py index bc6a7a2f9..473c340c4 100644 --- a/tests/cli/test_registry.py +++ b/tests/cli/test_registry.py @@ -36,7 +36,7 @@ def test_find_builtin_classes(): class_.__name__ for class_ in _find_classes("kpops.components", PipelineComponent) ] - assert len(components) == 9 + assert len(components) == 10 assert components == [ "HelmApp", "KafkaApp", @@ -47,6 +47,7 @@ def test_find_builtin_classes(): "PipelineComponent", "ProducerApp", "StreamsApp", + "StreamsBootstrap", ] diff --git a/tests/components/test_kafka_app.py b/tests/components/test_streams_bootstrap.py similarity index 75% rename from tests/components/test_kafka_app.py rename to tests/components/test_streams_bootstrap.py index d7e8fd5d4..9a53ef319 100644 --- a/tests/components/test_kafka_app.py +++ b/tests/components/test_streams_bootstrap.py @@ -11,13 +11,13 @@ HelmUpgradeInstallFlags, ) from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name -from kpops.components.base_components import KafkaApp +from kpops.components.streams_bootstrap import StreamsBootstrap from kpops.config import KpopsConfig DEFAULTS_PATH = Path(__file__).parent / "resources" -class TestKafkaApp: +class TestStreamsBootstrap: @pytest.fixture() def config(self) -> KpopsConfig: return KpopsConfig( @@ -34,36 +34,29 @@ def handlers(self) -> ComponentHandlers: ) def test_default_configs(self, config: KpopsConfig, handlers: ComponentHandlers): - kafka_app = KafkaApp( + streams_bootstrap_helm_app = StreamsBootstrap( name="example-name", config=config, handlers=handlers, **{ "namespace": "test-namespace", - "app": { - "streams": { - "outputTopic": "test", - "brokers": "fake-broker:9092", - }, - }, + "app": {}, }, ) - assert kafka_app.app.streams.brokers == "fake-broker:9092" - - assert kafka_app.repo_config == HelmRepoConfig( + assert streams_bootstrap_helm_app.repo_config == HelmRepoConfig( repository_name="bakdata-streams-bootstrap", url="https://bakdata.github.io/streams-bootstrap/", ) - assert kafka_app.version == "2.9.0" - assert kafka_app.namespace == "test-namespace" + assert streams_bootstrap_helm_app.version == "2.9.0" + assert streams_bootstrap_helm_app.namespace == "test-namespace" - def test_should_deploy_kafka_app( + def test_should_deploy_streams_bootstrap_helm_app( self, config: KpopsConfig, handlers: ComponentHandlers, mocker: MockerFixture, ): - kafka_app = KafkaApp( + streams_bootstrap_helm_app = StreamsBootstrap( name="example-name", config=config, handlers=handlers, @@ -78,18 +71,20 @@ def test_should_deploy_kafka_app( "version": "1.2.3", }, ) - helm_upgrade_install = mocker.patch.object(kafka_app.helm, "upgrade_install") + helm_upgrade_install = mocker.patch.object( + streams_bootstrap_helm_app.helm, "upgrade_install" + ) print_helm_diff = mocker.patch.object( - kafka_app.dry_run_handler, "print_helm_diff" + streams_bootstrap_helm_app.dry_run_handler, "print_helm_diff" ) mocker.patch.object( - KafkaApp, + StreamsBootstrap, "helm_chart", return_value="test/test-chart", new_callable=mocker.PropertyMock, ) - kafka_app.deploy(dry_run=True) + streams_bootstrap_helm_app.deploy(dry_run=True) print_helm_diff.assert_called_once() helm_upgrade_install.assert_called_once_with( diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index d45882ea1..585fd8407 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -5,7 +5,7 @@ Schema, SchemaProvider, ) -from kpops.components import KafkaSinkConnector +from kpops.components import KafkaSinkConnector, ProducerApp, StreamsApp from kpops.components.base_components import PipelineComponent from kpops.components.base_components.models import ModelName, ModelVersion, TopicName from kpops.components.base_components.models.to_section import ( @@ -13,7 +13,6 @@ TopicConfig, ToSection, ) -from kpops.components.streams_bootstrap import ProducerApp, StreamsApp class ScheduledProducer(ProducerApp): From d9786dda7e39e57ce004e17fc6a22339e7a673d1 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 3 Jan 2024 16:49:46 +0100 Subject: [PATCH 18/27] Cleanup KPOps components import --- tests/pipeline/test_components/components.py | 8 ++++++-- .../test_components_without_schema_handler/components.py | 9 ++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index 585fd8407..20f781545 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -5,8 +5,12 @@ Schema, SchemaProvider, ) -from kpops.components import KafkaSinkConnector, ProducerApp, StreamsApp -from kpops.components.base_components import PipelineComponent +from kpops.components import ( + KafkaSinkConnector, + PipelineComponent, + ProducerApp, + StreamsApp, +) from kpops.components.base_components.models import ModelName, ModelVersion, TopicName from kpops.components.base_components.models.to_section import ( OutputTopicTypes, diff --git a/tests/pipeline/test_components_without_schema_handler/components.py b/tests/pipeline/test_components_without_schema_handler/components.py index d5684178c..686aac26c 100644 --- a/tests/pipeline/test_components_without_schema_handler/components.py +++ b/tests/pipeline/test_components_without_schema_handler/components.py @@ -1,10 +1,13 @@ from typing_extensions import override from kpops.component_handlers.kafka_connect.model import KafkaConnectorConfig -from kpops.components import KafkaSinkConnector -from kpops.components.base_components import PipelineComponent +from kpops.components import ( + KafkaSinkConnector, + PipelineComponent, + ProducerApp, + StreamsApp, +) from kpops.components.base_components.models.to_section import OutputTopicTypes -from kpops.components.streams_bootstrap import ProducerApp, StreamsApp class ScheduledProducer(ProducerApp): From a49b551f2380d12ba44d1ddfc9fe07d70c7570fc Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 3 Jan 2024 16:53:09 +0100 Subject: [PATCH 19/27] Fix inheritance order of streams-boostrap apps --- .../dependencies/kpops_structure.yaml | 8 +++--- .../pipeline_component_dependencies.yaml | 10 +++---- .../pipeline-components/pipeline.yaml | 28 +++---------------- .../pipeline-components/producer-app.yaml | 14 ++-------- .../pipeline-components/streams-app.yaml | 14 ++-------- docs/docs/schema/pipeline.json | 8 +++--- .../producer/producer_app.py | 2 +- .../streams_bootstrap/streams/streams_app.py | 2 +- 8 files changed, 22 insertions(+), 64 deletions(-) diff --git a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml index 0d553845b..9c186896f 100644 --- a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml @@ -62,8 +62,8 @@ kpops_components_fields: - prefix - from_ - to - - app - namespace + - app - repo_config - version streams-app: @@ -71,8 +71,8 @@ kpops_components_fields: - prefix - from_ - to - - app - namespace + - app - repo_config - version streams-bootstrap: @@ -92,6 +92,6 @@ kpops_components_inheritance_ref: kafka-source-connector: kafka-connector kubernetes-app: pipeline-component pipeline-component: base-defaults-component - producer-app: streams-bootstrap - streams-app: streams-bootstrap + producer-app: kafka-app + streams-app: kafka-app streams-bootstrap: helm-app diff --git a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml index 1127dda94..3b706014c 100644 --- a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml @@ -49,15 +49,13 @@ producer-app.yaml: - prefix.yaml - from_-producer-app.yaml - to.yaml -- app-producer-app.yaml - namespace.yaml -- repo_config-helm-app.yaml -- version.yaml +- app-producer-app.yaml +- version-kafka-app.yaml streams-app.yaml: - prefix.yaml - from_.yaml - to.yaml -- app-streams-app.yaml - namespace.yaml -- repo_config-helm-app.yaml -- version.yaml +- app-streams-app.yaml +- version-kafka-app.yaml diff --git a/docs/docs/resources/pipeline-components/pipeline.yaml b/docs/docs/resources/pipeline-components/pipeline.yaml index 52245a05b..483244db1 100644 --- a/docs/docs/resources/pipeline-components/pipeline.yaml +++ b/docs/docs/resources/pipeline-components/pipeline.yaml @@ -310,6 +310,7 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model + namespace: namespace # required # Allowed configs: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required @@ -321,18 +322,7 @@ output_role1: output_topic1 output_role2: output_topic2 nameOverride: override-with-this-name # kafka-app-specific - namespace: namespace # required - # Helm repository configuration (optional) - # If not set the helm repo add will not be called. Useful when using local Helm charts - repo_config: - repository_name: bakdata-streams-bootstrap # required - url: https://bakdata.github.io/streams-bootstrap/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.0" # Helm chart version + version: "2.12.0" # Helm chart version # StreamsApp component that configures a streams bootstrap app. # More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap - type: streams-app # required @@ -379,6 +369,7 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model + namespace: namespace # required # No arbitrary keys are allowed under `app`here # Allowed configs: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app @@ -435,15 +426,4 @@ topics: # List of auto-generated Kafka Streams topics used by the streams app. - topic1 - topic2 - namespace: namespace # required - # Helm repository configuration (optional) - # If not set the helm repo add will not be called. Useful when using local Helm charts - repo_config: - repository_name: bakdata-streams-bootstrap # required - url: https://bakdata.github.io/streams-bootstrap/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.0" # Helm chart version + version: "2.12.0" # Helm chart version diff --git a/docs/docs/resources/pipeline-components/producer-app.yaml b/docs/docs/resources/pipeline-components/producer-app.yaml index 9a698e1b3..86721cac7 100644 --- a/docs/docs/resources/pipeline-components/producer-app.yaml +++ b/docs/docs/resources/pipeline-components/producer-app.yaml @@ -27,6 +27,7 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model + namespace: namespace # required # Allowed configs: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required @@ -38,15 +39,4 @@ output_role1: output_topic1 output_role2: output_topic2 nameOverride: override-with-this-name # kafka-app-specific - namespace: namespace # required - # Helm repository configuration (optional) - # If not set the helm repo add will not be called. Useful when using local Helm charts - repo_config: - repository_name: bakdata-streams-bootstrap # required - url: https://bakdata.github.io/streams-bootstrap/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.0" # Helm chart version + version: "2.12.0" # Helm chart version diff --git a/docs/docs/resources/pipeline-components/streams-app.yaml b/docs/docs/resources/pipeline-components/streams-app.yaml index c333631ef..e9f303686 100644 --- a/docs/docs/resources/pipeline-components/streams-app.yaml +++ b/docs/docs/resources/pipeline-components/streams-app.yaml @@ -44,6 +44,7 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model + namespace: namespace # required # No arbitrary keys are allowed under `app`here # Allowed configs: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app @@ -100,15 +101,4 @@ topics: # List of auto-generated Kafka Streams topics used by the streams app. - topic1 - topic2 - namespace: namespace # required - # Helm repository configuration (optional) - # If not set the helm repo add will not be called. Useful when using local Helm charts - repo_config: - repository_name: bakdata-streams-bootstrap # required - url: https://bakdata.github.io/streams-bootstrap/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.0" # Helm chart version + version: "2.12.0" # Helm chart version diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 9df7d8342..211b13928 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -531,8 +531,8 @@ }, "required": [ "name", - "app", - "namespace" + "namespace", + "app" ], "title": "ProducerApp", "type": "object" @@ -775,8 +775,8 @@ }, "required": [ "name", - "app", - "namespace" + "namespace", + "app" ], "title": "StreamsApp", "type": "object" diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 355c31cd3..2d6a586b2 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -28,7 +28,7 @@ def helm_chart(self) -> str: ) -class ProducerApp(StreamsBootstrap, KafkaApp): +class ProducerApp(KafkaApp, StreamsBootstrap): """Producer component. This producer holds configuration to use as values for the streams-bootstrap diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index beadd574c..2c632e882 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -22,7 +22,7 @@ def helm_chart(self) -> str: return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}" -class StreamsApp(StreamsBootstrap, KafkaApp): +class StreamsApp(KafkaApp, StreamsBootstrap): """StreamsApp component that configures a streams-bootstrap app. :param app: Application-specific settings From 2e2c7baed8fc6482468a1e28b5fd4f387e78da18 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 3 Jan 2024 18:08:20 +0100 Subject: [PATCH 20/27] Update components hierarchy diagram --- docs/docs/resources/architecture/components-hierarchy.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/docs/resources/architecture/components-hierarchy.md b/docs/docs/resources/architecture/components-hierarchy.md index 190c44f82..ce24acc46 100644 --- a/docs/docs/resources/architecture/components-hierarchy.md +++ b/docs/docs/resources/architecture/components-hierarchy.md @@ -1,10 +1,13 @@ ```mermaid flowchart BT KubernetesApp --> PipelineComponent + KafkaApp --> PipelineComponent HelmApp --> KubernetesApp - KafkaApp --> HelmApp + StreamsBootstrap --> HelmApp StreamsApp --> KafkaApp + StreamsApp --> StreamsBootstrap ProducerApp --> KafkaApp + ProducerApp --> StreamsBootstrap KafkaConnector --> PipelineComponent KafkaSourceConnector --> KafkaConnector KafkaSinkConnector --> KafkaConnector @@ -12,6 +15,7 @@ flowchart BT click KubernetesApp "/kpops/user/core-concepts/components/kubernetes-app" click HelmApp "/kpops/user/core-concepts/components/helm-app" click KafkaApp "/kpops/user/core-concepts/components/kafka-app" + click StreamsBootstrap "/kpops/user/core-concepts/components/streams-bootstrap" click StreamsApp "/kpops/user/core-concepts/components/streams-app" click ProducerApp "/kpops/user/core-concepts/components/producer-app" click KafkaConnector "/kpops/user/core-concepts/components/kafka-connector" From 02c0ef7120a0c2ef0635b363330d2543d79326e1 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Thu, 4 Jan 2024 13:05:10 +0100 Subject: [PATCH 21/27] Fix docs --- kpops/components/streams_bootstrap/streams/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 0447cf272..95100b966 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -28,7 +28,7 @@ class StreamsConfig(KafkaStreamsConfig): :param output_topic: Output topic, defaults to None :param error_topic: Error topic, defaults to None :param config: Configuration, defaults to {} - :param delete_output: Whether the output topics with their associated schemas and the consumer group should be deleted during the cleanup, defaults to False + :param delete_output: Whether the output topics with their associated schemas and the consumer group should be deleted during the cleanup, defaults to None """ input_topics: list[str] = Field( From 83ac3989da4e2d1ef39e2cb57d7b72c1d819174f Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Mon, 8 Jan 2024 13:31:18 +0100 Subject: [PATCH 22/27] Update defaults schema --- docs/docs/schema/defaults.json | 161 ++++++++++++++++++++++++--------- 1 file changed, 117 insertions(+), 44 deletions(-) diff --git a/docs/docs/schema/defaults.json b/docs/docs/schema/defaults.json index 137b547e9..aa392179b 100644 --- a/docs/docs/schema/defaults.json +++ b/docs/docs/schema/defaults.json @@ -247,36 +247,12 @@ "title": "Name", "type": "string" }, - "namespace": { - "description": "Namespace in which the component shall be deployed", - "title": "Namespace", - "type": "string" - }, "prefix": { "default": "${pipeline_name}-", "description": "Pipeline prefix that will prefix every component name. If you wish to not have any prefix you can specify an empty string.", "title": "Prefix", "type": "string" }, - "repo_config": { - "allOf": [ - { - "$ref": "#/$defs/HelmRepoConfig" - } - ], - "default": { - "repo_auth_flags": { - "ca_file": null, - "cert_file": null, - "insecure_skip_tls_verify": false, - "password": null, - "username": null - }, - "repository_name": "bakdata-streams-bootstrap", - "url": "https://bakdata.github.io/streams-bootstrap/" - }, - "description": "Configuration of the Helm chart repo to be used for deploying the component" - }, "to": { "anyOf": [ { @@ -288,24 +264,10 @@ ], "default": null, "description": "Topic(s) into which the component will write output" - }, - "version": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": "2.9.0", - "description": "Helm chart version", - "title": "Version" } }, "required": [ "name", - "namespace", "app" ], "title": "KafkaApp", @@ -837,7 +799,7 @@ }, "ProducerApp": { "additionalProperties": true, - "description": "Producer component.\nThis producer holds configuration to use as values for the streams bootstrap producer helm chart. Note that the producer does not support error topics.", + "description": "Producer component.\nThis producer holds configuration to use as values for the streams-bootstrap producer Helm chart. Note that the producer does not support error topics.", "properties": { "app": { "allOf": [ @@ -1079,7 +1041,7 @@ }, "StreamsApp": { "additionalProperties": true, - "description": "StreamsApp component that configures a streams bootstrap app.", + "description": "StreamsApp component that configures a streams-bootstrap app.", "properties": { "app": { "allOf": [ @@ -1258,7 +1220,7 @@ }, "StreamsAppValues": { "additionalProperties": true, - "description": "StreamsBoostrap app configurations.\nThe attributes correspond to keys and values that are used as values for the streams bootstrap helm chart.", + "description": "streams-bootstrap app configurations.\nThe attributes correspond to keys and values that are used as values for the streams bootstrap helm chart.", "properties": { "autoscaling": { "anyOf": [ @@ -1270,7 +1232,7 @@ } ], "default": null, - "description": "Kubernetes Event-driven Autoscaling config" + "description": "Kubernetes event-driven autoscaling config" }, "nameOverride": { "anyOf": [ @@ -1291,7 +1253,7 @@ "$ref": "#/$defs/StreamsConfig" } ], - "description": "Streams Bootstrap streams section" + "description": "streams-bootstrap streams section" } }, "required": [ @@ -1300,6 +1262,100 @@ "title": "StreamsAppValues", "type": "object" }, + "StreamsBootstrap": { + "additionalProperties": true, + "description": "Base for components with a streams-bootstrap Helm chart.", + "properties": { + "app": { + "allOf": [ + { + "$ref": "#/$defs/HelmAppValues" + } + ], + "description": "Helm app values" + }, + "from": { + "anyOf": [ + { + "$ref": "#/$defs/FromSection" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Topic(s) and/or components from which the component will read input", + "title": "From" + }, + "name": { + "description": "Component name", + "title": "Name", + "type": "string" + }, + "namespace": { + "description": "Namespace in which the component shall be deployed", + "title": "Namespace", + "type": "string" + }, + "prefix": { + "default": "${pipeline_name}-", + "description": "Pipeline prefix that will prefix every component name. If you wish to not have any prefix you can specify an empty string.", + "title": "Prefix", + "type": "string" + }, + "repo_config": { + "allOf": [ + { + "$ref": "#/$defs/HelmRepoConfig" + } + ], + "default": { + "repo_auth_flags": { + "ca_file": null, + "cert_file": null, + "insecure_skip_tls_verify": false, + "password": null, + "username": null + }, + "repository_name": "bakdata-streams-bootstrap", + "url": "https://bakdata.github.io/streams-bootstrap/" + }, + "description": "Configuration of the Helm chart repo to be used for deploying the component" + }, + "to": { + "anyOf": [ + { + "$ref": "#/$defs/ToSection" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Topic(s) into which the component will write output" + }, + "version": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": "2.9.0", + "description": "Helm chart version", + "title": "Version" + } + }, + "required": [ + "name", + "namespace", + "app" + ], + "title": "StreamsBootstrap", + "type": "object" + }, "StreamsConfig": { "additionalProperties": true, "description": "Streams Bootstrap streams section.", @@ -1315,6 +1371,19 @@ "title": "Config", "type": "object" }, + "deleteOutput": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Whether the output topics with their associated schemas and the consumer group should be deleted during the cleanup", + "title": "Deleteoutput" + }, "errorTopic": { "anyOf": [ { @@ -1569,6 +1638,9 @@ }, "streams-app": { "$ref": "#/$defs/StreamsApp" + }, + "streams-bootstrap": { + "$ref": "#/$defs/StreamsBootstrap" } }, "required": [ @@ -1580,7 +1652,8 @@ "kubernetes-app", "pipeline-component", "producer-app", - "streams-app" + "streams-app", + "streams-bootstrap" ], "title": "DefaultsSchema", "type": "object" From 45c58d80af780bbfa433cc4f178d058ca55612c0 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Mon, 8 Jan 2024 13:39:14 +0100 Subject: [PATCH 23/27] Add docs for streams-bootstrap --- docs/docs/user/core-concepts/components/producer-app.md | 2 +- docs/docs/user/core-concepts/components/streams-app.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/docs/user/core-concepts/components/producer-app.md b/docs/docs/user/core-concepts/components/producer-app.md index 1f55fa6d9..bff598d53 100644 --- a/docs/docs/user/core-concepts/components/producer-app.md +++ b/docs/docs/user/core-concepts/components/producer-app.md @@ -1,6 +1,6 @@ # ProducerApp -Subclass of [_KafkaApp_](kafka-app.md). +Subclass of [_KafkaApp_](kafka-app.md) and [_StreamsBootstrap_](streams-bootstrap.md). ### Usage diff --git a/docs/docs/user/core-concepts/components/streams-app.md b/docs/docs/user/core-concepts/components/streams-app.md index ac881ade2..d34705062 100644 --- a/docs/docs/user/core-concepts/components/streams-app.md +++ b/docs/docs/user/core-concepts/components/streams-app.md @@ -1,6 +1,6 @@ # StreamsApp -Subclass of [_KafkaApp_](kafka-app.md). +Subclass of [_KafkaApp_](kafka-app.md) and [_StreamsBootstrap_](streams-bootstrap.md). ### Usage From 7c924211189e9f942ecbda182c019f79e115d659 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Mon, 8 Jan 2024 13:39:14 +0100 Subject: [PATCH 24/27] Add docs for streams-bootstrap --- .../components/streams-bootstrap.md | 25 +++++++++++++++++++ docs/mkdocs.yml | 1 + 2 files changed, 26 insertions(+) create mode 100644 docs/docs/user/core-concepts/components/streams-bootstrap.md diff --git a/docs/docs/user/core-concepts/components/streams-bootstrap.md b/docs/docs/user/core-concepts/components/streams-bootstrap.md new file mode 100644 index 000000000..52bb5fa0e --- /dev/null +++ b/docs/docs/user/core-concepts/components/streams-bootstrap.md @@ -0,0 +1,25 @@ +# StreamsApp + +Subclass of [_HelmApp_](helm-app.md). + +### Usage + +Configures a Helm app with [streams-bootstrap Helm charts](https://github.com/bakdata/streams-bootstrap){target=_blank}. + +### Operations + +#### deploy + +Deploy using Helm. + +#### destroy + +Uninstall Helm release. + +#### reset + +Do nothing. + +#### clean + +Do nothing. diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index c6ef09c16..d436c94a5 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -95,6 +95,7 @@ nav: - Overview: user/core-concepts/components/overview.md - KubernetesApp: user/core-concepts/components/kubernetes-app.md - HelmApp: user/core-concepts/components/helm-app.md + - StreamsBootstrap: user/core-concepts/components/streams-bootstrap.md - KafkaApp: user/core-concepts/components/kafka-app.md - StreamsApp: user/core-concepts/components/streams-app.md - ProducerApp: user/core-concepts/components/producer-app.md From 916c239595ac1dda9381d50259d27939466a529c Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Thu, 11 Jan 2024 16:51:57 +0100 Subject: [PATCH 25/27] Update schema --- .../pipeline-defaults/defaults-kafka-app.yaml | 16 ++++++++-------- .../resources/pipeline-defaults/defaults.yaml | 16 ++++++++-------- docs/docs/schema/defaults.json | 2 +- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml b/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml index fa2c93020..a27bb38d1 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml @@ -5,16 +5,16 @@ kafka-app: # Pipeline prefix that will prefix every component name. If you wish to not # have any prefix you can specify an empty string. - prefix: ${pipeline_name}- + prefix: ${pipeline.name}- from: # Must not be null topics: # read from topic - ${pipeline_name}-input-topic: + ${pipeline.name}-input-topic: type: input # Implied when role is NOT specified - ${pipeline_name}-extra-topic: + ${pipeline.name}-extra-topic: role: topic-role # Implies `type` to be extra - ${pipeline_name}-input-pattern-topic: + ${pipeline.name}-input-pattern-topic: type: pattern # Implied to be an input pattern if `role` is undefined - ${pipeline_name}-extra-pattern-topic: + ${pipeline.name}-extra-pattern-topic: type: pattern # Implied to be an extra pattern if `role` is defined role: some-role components: # read from specific component @@ -30,11 +30,11 @@ kafka-app: # Topic(s) into which the component will write output to: topics: - ${pipeline_name}-output-topic: + ${pipeline.name}-output-topic: type: output # Implied when role is NOT specified - ${pipeline_name}-extra-topic: + ${pipeline.name}-extra-topic: role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined - ${pipeline_name}-error-topic: + ${pipeline.name}-error-topic: type: error # Currently KPOps supports Avro and JSON schemas. key_schema: key-schema # must implement SchemaProvider to use diff --git a/docs/docs/resources/pipeline-defaults/defaults.yaml b/docs/docs/resources/pipeline-defaults/defaults.yaml index f013ca384..05487c7c0 100644 --- a/docs/docs/resources/pipeline-defaults/defaults.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults.yaml @@ -26,16 +26,16 @@ helm-app: kafka-app: # Pipeline prefix that will prefix every component name. If you wish to not # have any prefix you can specify an empty string. - prefix: ${pipeline_name}- + prefix: ${pipeline.name}- from: # Must not be null topics: # read from topic - ${pipeline_name}-input-topic: + ${pipeline.name}-input-topic: type: input # Implied when role is NOT specified - ${pipeline_name}-extra-topic: + ${pipeline.name}-extra-topic: role: topic-role # Implies `type` to be extra - ${pipeline_name}-input-pattern-topic: + ${pipeline.name}-input-pattern-topic: type: pattern # Implied to be an input pattern if `role` is undefined - ${pipeline_name}-extra-pattern-topic: + ${pipeline.name}-extra-pattern-topic: type: pattern # Implied to be an extra pattern if `role` is defined role: some-role components: # read from specific component @@ -51,11 +51,11 @@ kafka-app: # Topic(s) into which the component will write output to: topics: - ${pipeline_name}-output-topic: + ${pipeline.name}-output-topic: type: output # Implied when role is NOT specified - ${pipeline_name}-extra-topic: + ${pipeline.name}-extra-topic: role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined - ${pipeline_name}-error-topic: + ${pipeline.name}-error-topic: type: error # Currently KPOps supports Avro and JSON schemas. key_schema: key-schema # must implement SchemaProvider to use diff --git a/docs/docs/schema/defaults.json b/docs/docs/schema/defaults.json index 6f1776b60..06ec5fdc0 100644 --- a/docs/docs/schema/defaults.json +++ b/docs/docs/schema/defaults.json @@ -1298,7 +1298,7 @@ "type": "string" }, "prefix": { - "default": "${pipeline_name}-", + "default": "${pipeline.name}-", "description": "Pipeline prefix that will prefix every component name. If you wish to not have any prefix you can specify an empty string.", "title": "Prefix", "type": "string" From 125c20f9e26d418d460cdbad092d581ce4a81ee6 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Thu, 11 Jan 2024 17:34:13 +0100 Subject: [PATCH 26/27] Rename variable in test --- tests/components/test_streams_bootstrap.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/components/test_streams_bootstrap.py b/tests/components/test_streams_bootstrap.py index bd664bced..127485e30 100644 --- a/tests/components/test_streams_bootstrap.py +++ b/tests/components/test_streams_bootstrap.py @@ -34,7 +34,7 @@ def handlers(self) -> ComponentHandlers: ) def test_default_configs(self, config: KpopsConfig, handlers: ComponentHandlers): - streams_bootstrap_helm_app = StreamsBootstrap( + streams_bootstrap = StreamsBootstrap( name="example-name", config=config, handlers=handlers, @@ -43,20 +43,20 @@ def test_default_configs(self, config: KpopsConfig, handlers: ComponentHandlers) "app": {}, }, ) - assert streams_bootstrap_helm_app.repo_config == HelmRepoConfig( + assert streams_bootstrap.repo_config == HelmRepoConfig( repository_name="bakdata-streams-bootstrap", url="https://bakdata.github.io/streams-bootstrap/", ) - assert streams_bootstrap_helm_app.version == "2.9.0" - assert streams_bootstrap_helm_app.namespace == "test-namespace" + assert streams_bootstrap.version == "2.9.0" + assert streams_bootstrap.namespace == "test-namespace" - def test_should_deploy_streams_bootstrap_helm_app( + def test_should_deploy_streams_bootstrap_app( self, config: KpopsConfig, handlers: ComponentHandlers, mocker: MockerFixture, ): - streams_bootstrap_helm_app = StreamsBootstrap( + streams_bootstrap = StreamsBootstrap( name="example-name", config=config, handlers=handlers, @@ -72,10 +72,10 @@ def test_should_deploy_streams_bootstrap_helm_app( }, ) helm_upgrade_install = mocker.patch.object( - streams_bootstrap_helm_app.helm, "upgrade_install" + streams_bootstrap.helm, "upgrade_install" ) print_helm_diff = mocker.patch.object( - streams_bootstrap_helm_app.dry_run_handler, "print_helm_diff" + streams_bootstrap.dry_run_handler, "print_helm_diff" ) mocker.patch.object( StreamsBootstrap, @@ -84,7 +84,7 @@ def test_should_deploy_streams_bootstrap_helm_app( new_callable=mocker.PropertyMock, ) - streams_bootstrap_helm_app.deploy(dry_run=True) + streams_bootstrap.deploy(dry_run=True) print_helm_diff.assert_called_once() helm_upgrade_install.assert_called_once_with( From 059632e4480e171da976a5b8d49768ba29b7f009 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Thu, 11 Jan 2024 17:46:41 +0100 Subject: [PATCH 27/27] Cover breaking change in migration guide --- docs/docs/user/migration-guide/v2-v3.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/docs/user/migration-guide/v2-v3.md b/docs/docs/user/migration-guide/v2-v3.md index c4b42c3fa..2c1eef100 100644 --- a/docs/docs/user/migration-guide/v2-v3.md +++ b/docs/docs/user/migration-guide/v2-v3.md @@ -40,6 +40,22 @@ All Helm-specific parts of the built-in [`KubernetesApp`](../core-concepts/compo ... ``` +## [Create StreamsBootstrap component & refactor cleanup jobs as individual HelmApp](https://github.com/bakdata/kpops/pull/398) + +Previously the default `KafkaApp` component configured the [streams-bootstrap](https://bakdata.github.io/streams-bootstrap/) Helm Charts. Now, this component is no longer tied to Helm (or Kubernetes). Instead, there is a new `StreamsBootstrap` component that configures the Helm Chart repository for the components that use it, e.g. `StreamsApp` and `ProducerApp`. If you are using non-default values for the Helm Chart repository or version, it has to be updated as shown below. + +#### defaults.yaml + +```diff + kafka-app: + app: + streams: ... + ++ streams-bootstrap: + repo_config: ... + version: ... +``` + ## [Make Kafka REST Proxy & Kafka Connect hosts default and improve Schema Registry config](https://github.com/bakdata/kpops/pull/354) The breaking changes target the `config.yaml` file: