From dac1cc9640d8204f725b220cc9a1a0a5b62891d0 Mon Sep 17 00:00:00 2001 From: Ivan Yordanov Date: Tue, 12 Dec 2023 16:41:28 +0200 Subject: [PATCH] Change substitution variables separator to `.` (#388) Co-authored-by: Salomon Popp --- .../resources/variables/config_env_vars.env | 4 +- .../resources/variables/config_env_vars.md | 4 +- .../variables/variable_substitution.yaml | 16 ++++---- docs/docs/schema/config.json | 8 ++-- .../core-concepts/variables/substitution.md | 6 +-- docs/docs/user/migration-guide/v2-v3.md | 29 ++++++++++++++ .../bakdata/atm-fraud-detection/config.yaml | 4 +- .../bakdata/atm-fraud-detection/defaults.yaml | 2 +- kpops/config.py | 4 +- kpops/pipeline_generator/pipeline.py | 5 ++- kpops/utils/dict_ops.py | 40 +++++++++++++++++++ kpops/utils/yaml_loading.py | 10 ++++- tests/cli/test_kpops_config.py | 4 +- .../infinite_pipeline.yaml | 6 +-- .../component-type-substitution/pipeline.yaml | 16 ++++---- .../resources/custom-config/config.yaml | 4 +- tests/pipeline/resources/defaults.yaml | 4 +- .../kafka-connect-sink-config/config.yaml | 4 +- .../no-topics-defaults/defaults.yaml | 2 +- .../defaults_development.yaml | 2 +- .../defaults.yaml | 2 +- .../pipeline-with-env-defaults/defaults.yaml | 4 +- .../defaults_development.yaml | 2 +- tests/pipeline/test_components/components.py | 6 +-- 24 files changed, 133 insertions(+), 55 deletions(-) diff --git a/docs/docs/resources/variables/config_env_vars.env b/docs/docs/resources/variables/config_env_vars.env index 27ea591a7..b7e1a2ced 100644 --- a/docs/docs/resources/variables/config_env_vars.env +++ b/docs/docs/resources/variables/config_env_vars.env @@ -18,10 +18,10 @@ KPOPS_KAFKA_BROKERS # No default value, required KPOPS_DEFAULTS_FILENAME_PREFIX=defaults # topic_name_config.default_output_topic_name # Configures the value for the variable ${output_topic_name} -KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME=${pipeline_name}-${component_name} +KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME=${pipeline_name}-${component.name} # topic_name_config.default_error_topic_name # Configures the value for the variable ${error_topic_name} -KPOPS_TOPIC_NAME_CONFIG__DEFAULT_ERROR_TOPIC_NAME=${pipeline_name}-${component_name}-error +KPOPS_TOPIC_NAME_CONFIG__DEFAULT_ERROR_TOPIC_NAME=${pipeline_name}-${component.name}-error # schema_registry.enabled # Whether the Schema Registry handler should be initialized. KPOPS_SCHEMA_REGISTRY__ENABLED=False diff --git a/docs/docs/resources/variables/config_env_vars.md b/docs/docs/resources/variables/config_env_vars.md index 53f540387..f9b9854ac 100644 --- a/docs/docs/resources/variables/config_env_vars.md +++ b/docs/docs/resources/variables/config_env_vars.md @@ -5,8 +5,8 @@ These variables are a lower priority alternative to the settings in `config.yaml |KPOPS_DEFAULTS_PATH |. |False |The path to the folder containing the defaults.yaml file and the environment defaults files. Paths can either be absolute or relative to `config.yaml`|defaults_path | |KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers | |KPOPS_DEFAULTS_FILENAME_PREFIX |defaults |False |The name of the defaults file and the prefix of the defaults environment file. |defaults_filename_prefix | -|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline_name}-${component_name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name| -|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_ERROR_TOPIC_NAME |${pipeline_name}-${component_name}-error|False |Configures the value for the variable ${error_topic_name} |topic_name_config.default_error_topic_name | +|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_OUTPUT_TOPIC_NAME|${pipeline_name}-${component.name} |False |Configures the value for the variable ${output_topic_name} |topic_name_config.default_output_topic_name| +|KPOPS_TOPIC_NAME_CONFIG__DEFAULT_ERROR_TOPIC_NAME |${pipeline_name}-${component.name}-error|False |Configures the value for the variable ${error_topic_name} |topic_name_config.default_error_topic_name | |KPOPS_SCHEMA_REGISTRY__ENABLED |False |False |Whether the Schema Registry handler should be initialized. |schema_registry.enabled | |KPOPS_SCHEMA_REGISTRY__URL |http://localhost:8081/ |False |Address of the Schema Registry. |schema_registry.url | |KPOPS_KAFKA_REST__URL |http://localhost:8082/ |False |Address of the Kafka REST Proxy. |kafka_rest.url | diff --git a/docs/docs/resources/variables/variable_substitution.yaml b/docs/docs/resources/variables/variable_substitution.yaml index 16e042586..8a4cf60ea 100644 --- a/docs/docs/resources/variables/variable_substitution.yaml +++ b/docs/docs/resources/variables/variable_substitution.yaml @@ -1,9 +1,9 @@ - type: scheduled-producer app: labels: - app_type: "${component_type}" - app_name: "${component_name}" - app_schedule: "${component_app_schedule}" + app_type: "${component.type}" + app_name: "${component.name}" + app_schedule: "${component.app.schedule}" commandLine: FAKE_ARG: "fake-arg-value" schedule: "30 3/8 * * *" @@ -20,11 +20,11 @@ name: "filter-app" app: labels: - app_type: "${component_type}" - app_name: "${component_name}" - app_resources_requests_memory: "${component_app_resources_requests_memory}" - ${component_type}: "${component_app_labels_app_name}-${component_app_labels_app_type}" - test_placeholder_in_placeholder: "${component_app_labels_${component_type}}" + app_type: "${component.type}" + app_name: "${component.name}" + app_resources_requests_memory: "${component.app.resources.requests.memory}" + ${component.type}: "${component.app.labels.app_name}-${component.app.labels.app_type}" + test_placeholder_in_placeholder: "${component.app.labels.${component.type}}" commandLine: TYPE: "nothing" resources: diff --git a/docs/docs/schema/config.json b/docs/docs/schema/config.json index 524049a09..7708ed04c 100644 --- a/docs/docs/schema/config.json +++ b/docs/docs/schema/config.json @@ -119,13 +119,13 @@ "description": "Configure the topic name variables you can use in the pipeline definition.", "properties": { "default_error_topic_name": { - "default": "${pipeline_name}-${component_name}-error", + "default": "${pipeline_name}-${component.name}-error", "description": "Configures the value for the variable ${error_topic_name}", "title": "Default Error Topic Name", "type": "string" }, "default_output_topic_name": { - "default": "${pipeline_name}-${component_name}", + "default": "${pipeline_name}-${component.name}", "description": "Configures the value for the variable ${output_topic_name}", "title": "Default Output Topic Name", "type": "string" @@ -246,8 +246,8 @@ } ], "default": { - "default_error_topic_name": "${pipeline_name}-${component_name}-error", - "default_output_topic_name": "${pipeline_name}-${component_name}" + "default_error_topic_name": "${pipeline_name}-${component.name}-error", + "default_output_topic_name": "${pipeline_name}-${component.name}" }, "description": "Configure the topic name variables you can use in the pipeline definition." } diff --git a/docs/docs/user/core-concepts/variables/substitution.md b/docs/docs/user/core-concepts/variables/substitution.md index 4ef9b1b25..71782180d 100644 --- a/docs/docs/user/core-concepts/variables/substitution.md +++ b/docs/docs/user/core-concepts/variables/substitution.md @@ -6,7 +6,7 @@ KPOps supports the usage of placeholders and environment variables in [pipeline These variables can be used in a component's definition to refer to any of its attributes, including ones that the user has defined in the defaults. -All of them are prefixed with `component_` and follow the following form: `component_{attribute_name}`. If the attribute itself contains attributes, they can be referred to like this: `component_{attribute_name}_{subattribute_name}`. +All of them are prefixed with `component.` and follow the following form: `component.{attribute_name}`. If the attribute itself contains attributes, they can be referred to like this: `component.{attribute_name}.{subattribute_name}`. @@ -26,8 +26,8 @@ These variables include all fields in the [config](../config.md) and refer to th !!! info Aliases - `error_topic_name` is an alias for `topic_name_config_default_error_topic_name` - `output_topic_name` is an alias for `topic_name_config_default_output_topic_name` + `error_topic_name` is an alias for `topic_name_config.default_error_topic_name` + `output_topic_name` is an alias for `topic_name_config.default_output_topic_name` diff --git a/docs/docs/user/migration-guide/v2-v3.md b/docs/docs/user/migration-guide/v2-v3.md index 6f5c5a1bf..d44c49503 100644 --- a/docs/docs/user/migration-guide/v2-v3.md +++ b/docs/docs/user/migration-guide/v2-v3.md @@ -81,3 +81,32 @@ The `--config` flag in the CLI now points to the directory that contains `config enabled: true url: "http://my-custom-sr.url:8081" ``` + +## [Change substitution variables separator to `.`](https://github.com/bakdata/kpops/pull/388) + +The delimiter in the substitution variables is changed to `.`. + +#### pipeline.yaml and default.yaml + +```diff +steps: + - type: scheduled-producer + app: + labels: +- app_type: "${component_type}" +- app_name: "${component_name}" +- app_schedule: "${component_app_schedule}" ++ app_type: "${component.type}" ++ app_name: "${component.name}" ++ app_schedule: "${component.app.schedule}" +``` + +#### config.yaml + +```diff +topic_name_config: +- default_error_topic_name: "${pipeline_name}-${component_name}-dead-letter-topic" +- default_output_topic_name: "${pipeline_name}-${component_name}-topic" ++ default_error_topic_name: "${pipeline_name}-${component.name}-dead-letter-topic" ++ default_output_topic_name: "${pipeline_name}-${component.name}-topic" +``` diff --git a/examples/bakdata/atm-fraud-detection/config.yaml b/examples/bakdata/atm-fraud-detection/config.yaml index 2f158fd62..41740ae77 100644 --- a/examples/bakdata/atm-fraud-detection/config.yaml +++ b/examples/bakdata/atm-fraud-detection/config.yaml @@ -1,6 +1,6 @@ topic_name_config: - default_error_topic_name: "${pipeline_name}-${component_name}-dead-letter-topic" - default_output_topic_name: "${pipeline_name}-${component_name}-topic" + default_error_topic_name: "${pipeline_name}-${component.name}-dead-letter-topic" + default_output_topic_name: "${pipeline_name}-${component.name}-topic" kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" diff --git a/examples/bakdata/atm-fraud-detection/defaults.yaml b/examples/bakdata/atm-fraud-detection/defaults.yaml index e3ba49c67..2e9079f4f 100644 --- a/examples/bakdata/atm-fraud-detection/defaults.yaml +++ b/examples/bakdata/atm-fraud-detection/defaults.yaml @@ -11,7 +11,7 @@ kafka-app: app: streams: brokers: ${kafka_brokers} - schemaRegistryUrl: ${schema_registry_url} + schemaRegistryUrl: ${schema_registry.url} optimizeLeaveGroupBehavior: false producer-app: diff --git a/kpops/config.py b/kpops/config.py index 6e3188359..172ff4305 100644 --- a/kpops/config.py +++ b/kpops/config.py @@ -21,11 +21,11 @@ class TopicNameConfig(BaseSettings): """Configure the topic name variables you can use in the pipeline definition.""" default_output_topic_name: str = Field( - default="${pipeline_name}-${component_name}", + default="${pipeline_name}-${component.name}", description="Configures the value for the variable ${output_topic_name}", ) default_error_topic_name: str = Field( - default="${pipeline_name}-${component_name}-error", + default="${pipeline_name}-${component.name}-error", description="Configures the value for the variable ${error_topic_name}", ) diff --git a/kpops/pipeline_generator/pipeline.py b/kpops/pipeline_generator/pipeline.py index b0bcf2676..07ea94e1e 100644 --- a/kpops/pipeline_generator/pipeline.py +++ b/kpops/pipeline_generator/pipeline.py @@ -295,9 +295,12 @@ def substitute_in_component(self, component_as_dict: dict) -> dict: component_as_dict, "component", substitution_hardcoded, + separator=".", ) substitution = generate_substitution( - config.model_dump(mode="json"), existing_substitution=component_substitution + config.model_dump(mode="json"), + existing_substitution=component_substitution, + separator=".", ) return json.loads( diff --git a/kpops/utils/dict_ops.py b/kpops/utils/dict_ops.py index c53cc383d..fa8aecd7b 100644 --- a/kpops/utils/dict_ops.py +++ b/kpops/utils/dict_ops.py @@ -1,6 +1,11 @@ +import re +from collections import ChainMap as _ChainMap from collections.abc import Mapping +from string import Template from typing import Any, TypeVar +from typing_extensions import override + def update_nested_pair(original_dict: dict, other_dict: Mapping) -> dict: """Nested update for 2 dictionaries. @@ -99,3 +104,38 @@ def generate_substitution( return update_nested( existing_substitution or {}, flatten_mapping(input, prefix, separator) ) + + +_sentinel_dict = {} + + +class ImprovedTemplate(Template): + """Introduces the dot as an allowed character in placeholders.""" + + idpattern = r"(?a:[_a-z][_.a-z0-9]*)" + + @override + def safe_substitute(self, mapping=_sentinel_dict, /, **kws) -> str: + if mapping is _sentinel_dict: + mapping = kws + elif kws: + mapping = _ChainMap(kws, mapping) + + # Helper function for .sub() + def convert(mo: re.Match): + named = mo.group("named") or mo.group("braced") + if named is not None: + try: + if "." not in named: + return str(mapping[named]) + return str(mapping[named.replace(".", "__")]) + except KeyError: + return mo.group() + if mo.group("escaped") is not None: + return self.delimiter + if mo.group("invalid") is not None: + return mo.group() + msg = "Unrecognized named group in pattern" + raise ValueError(msg, self.pattern) + + return self.pattern.sub(convert, self.template) diff --git a/kpops/utils/yaml_loading.py b/kpops/utils/yaml_loading.py index fb810c193..668a609cc 100644 --- a/kpops/utils/yaml_loading.py +++ b/kpops/utils/yaml_loading.py @@ -1,12 +1,13 @@ from collections.abc import Mapping from pathlib import Path -from string import Template from typing import Any import yaml from cachetools import cached from cachetools.keys import hashkey +from kpops.utils.dict_ops import ImprovedTemplate + def generate_hashkey( file_path: Path, substitution: Mapping[str, Any] | None = None @@ -33,7 +34,12 @@ def substitute(input: str, substitution: Mapping[str, Any] | None = None) -> str """ if not substitution: return input - return Template(input).safe_substitute(**substitution) + + def prepare_substitution(substitution: Mapping[str, Any]) -> dict[str, Any]: + """Replace dots with underscores in the substitution keys.""" + return {k.replace(".", "__"): v for k, v in substitution.items()} + + return ImprovedTemplate(input).safe_substitute(**prepare_substitution(substitution)) def substitute_nested(input: str, **kwargs) -> str: diff --git a/tests/cli/test_kpops_config.py b/tests/cli/test_kpops_config.py index 14994acb0..e52b2345a 100644 --- a/tests/cli/test_kpops_config.py +++ b/tests/cli/test_kpops_config.py @@ -18,11 +18,11 @@ def test_kpops_config_with_default_values(): assert default_config.defaults_filename_prefix == "defaults" assert ( default_config.topic_name_config.default_output_topic_name - == "${pipeline_name}-${component_name}" + == "${pipeline_name}-${component.name}" ) assert ( default_config.topic_name_config.default_error_topic_name - == "${pipeline_name}-${component_name}-error" + == "${pipeline_name}-${component.name}-error" ) assert default_config.schema_registry.enabled is False assert default_config.schema_registry.url == AnyHttpUrl("http://localhost:8081") diff --git a/tests/pipeline/resources/component-type-substitution/infinite_pipeline.yaml b/tests/pipeline/resources/component-type-substitution/infinite_pipeline.yaml index e01434ceb..17eba50a2 100644 --- a/tests/pipeline/resources/component-type-substitution/infinite_pipeline.yaml +++ b/tests/pipeline/resources/component-type-substitution/infinite_pipeline.yaml @@ -1,6 +1,6 @@ - type: converter app: labels: - l_1: ${component_app_labels_l_2} - l_2: ${component_app_labels_l_1} - infinite_nesting: ${component_app_labels} + l_1: ${component.app.labels.l_2} + l_2: ${component.app.labels.l_1} + infinite_nesting: ${component.app.labels} diff --git a/tests/pipeline/resources/component-type-substitution/pipeline.yaml b/tests/pipeline/resources/component-type-substitution/pipeline.yaml index 16e042586..8a4cf60ea 100644 --- a/tests/pipeline/resources/component-type-substitution/pipeline.yaml +++ b/tests/pipeline/resources/component-type-substitution/pipeline.yaml @@ -1,9 +1,9 @@ - type: scheduled-producer app: labels: - app_type: "${component_type}" - app_name: "${component_name}" - app_schedule: "${component_app_schedule}" + app_type: "${component.type}" + app_name: "${component.name}" + app_schedule: "${component.app.schedule}" commandLine: FAKE_ARG: "fake-arg-value" schedule: "30 3/8 * * *" @@ -20,11 +20,11 @@ name: "filter-app" app: labels: - app_type: "${component_type}" - app_name: "${component_name}" - app_resources_requests_memory: "${component_app_resources_requests_memory}" - ${component_type}: "${component_app_labels_app_name}-${component_app_labels_app_type}" - test_placeholder_in_placeholder: "${component_app_labels_${component_type}}" + app_type: "${component.type}" + app_name: "${component.name}" + app_resources_requests_memory: "${component.app.resources.requests.memory}" + ${component.type}: "${component.app.labels.app_name}-${component.app.labels.app_type}" + test_placeholder_in_placeholder: "${component.app.labels.${component.type}}" commandLine: TYPE: "nothing" resources: diff --git a/tests/pipeline/resources/custom-config/config.yaml b/tests/pipeline/resources/custom-config/config.yaml index 74910f62d..60410489d 100644 --- a/tests/pipeline/resources/custom-config/config.yaml +++ b/tests/pipeline/resources/custom-config/config.yaml @@ -1,7 +1,7 @@ defaults_path: ../no-topics-defaults topic_name_config: - default_error_topic_name: "${component_name}-dead-letter-topic" - default_output_topic_name: "${component_name}-test-topic" + default_error_topic_name: "${component.name}-dead-letter-topic" + default_output_topic_name: "${component.name}-test-topic" kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" kafka_connect: url: "http://localhost:8083" diff --git a/tests/pipeline/resources/defaults.yaml b/tests/pipeline/resources/defaults.yaml index e1223203b..b78293627 100644 --- a/tests/pipeline/resources/defaults.yaml +++ b/tests/pipeline/resources/defaults.yaml @@ -1,12 +1,12 @@ kubernetes-app: - name: "${component_type}" + name: "${component.type}" namespace: example-namespace kafka-app: app: streams: brokers: "${kafka_brokers}" - schema_registry_url: "${schema_registry_url}" + schema_registry_url: "${schema_registry.url}" version: "2.4.2" producer-app: {} # inherits from kafka-app diff --git a/tests/pipeline/resources/kafka-connect-sink-config/config.yaml b/tests/pipeline/resources/kafka-connect-sink-config/config.yaml index 151484205..572b695c7 100644 --- a/tests/pipeline/resources/kafka-connect-sink-config/config.yaml +++ b/tests/pipeline/resources/kafka-connect-sink-config/config.yaml @@ -1,8 +1,8 @@ defaults_path: .. kafka_brokers: "broker:9092" topic_name_config: - default_error_topic_name: ${component_type}-error-topic - default_output_topic_name: ${component_type}-output-topic + default_error_topic_name: ${component.type}-error-topic + default_output_topic_name: ${component.type}-output-topic helm_diff_config: enable: false kafka_connect: diff --git a/tests/pipeline/resources/no-topics-defaults/defaults.yaml b/tests/pipeline/resources/no-topics-defaults/defaults.yaml index 87d21d47d..ea3dd7d9e 100644 --- a/tests/pipeline/resources/no-topics-defaults/defaults.yaml +++ b/tests/pipeline/resources/no-topics-defaults/defaults.yaml @@ -2,7 +2,7 @@ kafka-app: app: streams: brokers: "${kafka_brokers}" - schemaRegistryUrl: "${schema_registry_url}" + schemaRegistryUrl: "${schema_registry.url}" producer-app: to: diff --git a/tests/pipeline/resources/no-topics-defaults/defaults_development.yaml b/tests/pipeline/resources/no-topics-defaults/defaults_development.yaml index 035691c2e..b6a05220f 100644 --- a/tests/pipeline/resources/no-topics-defaults/defaults_development.yaml +++ b/tests/pipeline/resources/no-topics-defaults/defaults_development.yaml @@ -1,3 +1,3 @@ kubernetes-app: - name: "${component_type}-development" + name: "${component.type}-development" namespace: development-namespace diff --git a/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml b/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml index c67f869d9..b5954da19 100644 --- a/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml +++ b/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml @@ -8,4 +8,4 @@ kafka-app: app: streams: brokers: ${kafka_brokers} - schemaRegistryUrl: ${schema_registry_url} + schemaRegistryUrl: ${schema_registry.url} diff --git a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml index 44229f00e..f9505c0ab 100644 --- a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml +++ b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml @@ -1,11 +1,11 @@ kubernetes-app: - name: ${component_type} + name: ${component.type} namespace: example-namespace kafka-app: app: streams: brokers: "${kafka_brokers}" - schemaRegistryUrl: "${schema_registry_url}" + schemaRegistryUrl: "${schema_registry.url}" producer-app: {} # inherits from kafka-app diff --git a/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml b/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml index c7b863a92..80987e36e 100644 --- a/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml +++ b/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml @@ -1,3 +1,3 @@ kubernetes-app: - name: ${component_type}-development + name: ${component.type}-development namespace: development-namespace diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index 84698c0b4..d45882ea1 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -51,10 +51,10 @@ def inflate(self) -> list[PipelineComponent]: }, to=ToSection( topics={ - TopicName("${component_type}"): TopicConfig( + TopicName("${component.type}"): TopicConfig( type=OutputTopicTypes.OUTPUT ), - TopicName("${component_name}"): TopicConfig( + TopicName("${component.name}"): TopicConfig( type=None, role="test" ), } @@ -68,7 +68,7 @@ def inflate(self) -> list[PipelineComponent]: to=ToSection( # type: ignore[reportGeneralTypeIssues] topics={ TopicName( - f"{self.full_name}-" + "${component_name}" + f"{self.full_name}-" + "${component.name}" ): TopicConfig(type=OutputTopicTypes.OUTPUT) } ).model_dump(),