Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix substitution #449

Merged
merged 52 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
f71da2a
test: minimal example
sujuka99 Feb 5, 2024
c59efee
Improve tests
sujuka99 Feb 6, 2024
0d3eb25
add much more minimal example
sujuka99 Feb 6, 2024
634ed0c
fix eof
sujuka99 Feb 6, 2024
4e48966
fix: substitution timing WIP
sujuka99 Feb 6, 2024
35fd009
Test using Python API
disrupted Feb 6, 2024
e7722ed
Run the actual assertion
disrupted Feb 6, 2024
b0e029c
Fix assertions
disrupted Feb 6, 2024
1ff50e0
Implement fix
disrupted Feb 6, 2024
169110e
Cosmetic
disrupted Feb 6, 2024
2737a18
Fix pipeline_{environment}.yaml override
disrupted Feb 6, 2024
1157aec
Fix double schema registry url
disrupted Feb 6, 2024
052a7c2
Try model_dump without `by_alias=True`
disrupted Feb 6, 2024
d0cbbbc
Fix Ruff diagnostics
disrupted Feb 6, 2024
df84620
Fix schema generation
disrupted Feb 6, 2024
986ab45
Fix Ruff
disrupted Feb 6, 2024
d91787d
Fix substitution var notation in tests
disrupted Feb 6, 2024
04ac93e
Update test
disrupted Feb 6, 2024
5c89efe
Refactor
disrupted Feb 6, 2024
48459ee
Improve performance
disrupted Feb 6, 2024
2e5f543
Mark todo
disrupted Feb 6, 2024
f49fcef
Fix KafkaConnector test
disrupted Feb 7, 2024
588d668
Revert 1 commits
disrupted Feb 7, 2024
fde7a54
Refactor & mark todo
disrupted Feb 7, 2024
52678b2
Run substitute twice to fix test
disrupted Feb 7, 2024
c9aefd3
Improve typing
disrupted Feb 7, 2024
29a23bf
Clear KPOps env for component tests
disrupted Feb 7, 2024
a19949a
Revert removal of `by_alias=True`
disrupted Feb 7, 2024
0037dba
Fix duplicate schema_registry_url / schemaRegistryUrl in model dump
disrupted Feb 7, 2024
01a4070
Fix order of pipeline steps for clean/reset
disrupted Feb 7, 2024
3b5f494
Merge branch 'fix/clean-order' into fix/inflate-substitution
disrupted Feb 7, 2024
a68f24a
Include Kafka app cleaner in generate dump
disrupted Feb 7, 2024
536c83f
Fix recursion error
disrupted Feb 7, 2024
57eb78e
Merge branch 'feat/cleaner-computed-property' into fix/inflate-substi…
disrupted Feb 7, 2024
6a1f153
Fix validation too early
disrupted Feb 7, 2024
c39d636
Update snapshots
disrupted Feb 7, 2024
47b27c1
Cleanup arguments
disrupted Feb 7, 2024
d7709f1
Rename tmp self
disrupted Feb 7, 2024
d108fd1
Cleanup
disrupted Feb 7, 2024
d4efc48
Remove from & to section from Kafka app cleaner
disrupted Feb 7, 2024
c6bfbbc
Update snapshot
disrupted Feb 7, 2024
9283043
Remove from & to section from Connector resetter
disrupted Feb 7, 2024
c76f072
Update snapshots
disrupted Feb 7, 2024
6b2a56c
Revert 4 commits
disrupted Feb 7, 2024
c7f1868
Cosmetic
disrupted Feb 7, 2024
cbd1172
remove duplicated substitution function
sujuka99 Feb 8, 2024
cfc7676
Remove redundant test.
sujuka99 Feb 8, 2024
3c9bf3e
Unused import
sujuka99 Feb 8, 2024
f4c6339
Merge remote-tracking branch 'origin/main' into fix/inflate-substitution
sujuka99 Feb 8, 2024
42b6d46
Remove leftover comments.
sujuka99 Feb 8, 2024
2d83e7f
Merge branch 'main' into fix/inflate-substitution
disrupted Feb 8, 2024
094c714
Fix custom component name
sujuka99 Feb 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/docs/schema/defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@
"title": "Brokers",
"type": "string"
},
"schemaRegistryUrl": {
"schema_registry_url": {
"anyOf": [
{
"type": "string"
Expand All @@ -597,7 +597,7 @@
],
"default": null,
"description": "URL of the schema registry",
"title": "Schemaregistryurl"
"title": "Schema Registry Url"
}
},
"required": [
Expand Down Expand Up @@ -886,7 +886,7 @@
"description": "Output topic",
"title": "Outputtopic"
},
"schemaRegistryUrl": {
"schema_registry_url": {
"anyOf": [
{
"type": "string"
Expand All @@ -897,7 +897,7 @@
],
"default": null,
"description": "URL of the schema registry",
"title": "Schemaregistryurl"
"title": "Schema Registry Url"
}
},
"required": [
Expand Down Expand Up @@ -1396,7 +1396,7 @@
"description": "Output topic",
"title": "Outputtopic"
},
"schemaRegistryUrl": {
"schema_registry_url": {
"anyOf": [
{
"type": "string"
Expand All @@ -1407,7 +1407,7 @@
],
"default": null,
"description": "URL of the schema registry",
"title": "Schemaregistryurl"
"title": "Schema Registry Url"
}
},
"required": [
Expand Down
8 changes: 4 additions & 4 deletions docs/docs/schema/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@
"description": "Output topic",
"title": "Outputtopic"
},
"schemaRegistryUrl": {
"schema_registry_url": {
"anyOf": [
{
"type": "string"
Expand All @@ -587,7 +587,7 @@
],
"default": null,
"description": "URL of the schema registry",
"title": "Schemaregistryurl"
"title": "Schema Registry Url"
}
},
"required": [
Expand Down Expand Up @@ -992,7 +992,7 @@
"description": "Output topic",
"title": "Outputtopic"
},
"schemaRegistryUrl": {
"schema_registry_url": {
"anyOf": [
{
"type": "string"
Expand All @@ -1003,7 +1003,7 @@
],
"default": null,
"description": "URL of the schema registry",
"title": "Schemaregistryurl"
"title": "Schema Registry Url"
}
},
"required": [
Expand Down
4 changes: 2 additions & 2 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ async def async_reset():
pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True)
await pipeline_tasks
else:
for component in pipeline.components:
for component in reversed(pipeline.components):
await reset_runner(component)

asyncio.run(async_reset())
Expand Down Expand Up @@ -481,7 +481,7 @@ async def async_clean():
pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True)
await pipeline_tasks
else:
for component in pipeline.components:
for component in reversed(pipeline.components):
await clean_runner(component)

asyncio.run(async_clean())
Expand Down
72 changes: 63 additions & 9 deletions kpops/components/base_components/base_defaults_component.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
import logging
from abc import ABC
from collections.abc import Sequence
Expand All @@ -21,11 +22,16 @@
from kpops.component_handlers import ComponentHandlers
from kpops.config import KpopsConfig
from kpops.utils import cached_classproperty
from kpops.utils.dict_ops import update_nested, update_nested_pair
from kpops.utils.dict_ops import (
generate_substitution,
update_nested,
update_nested_pair,
)
from kpops.utils.docstring import describe_attr
from kpops.utils.environment import ENV
from kpops.utils.pydantic import DescConfigModel, issubclass_patched, to_dash
from kpops.utils.yaml import load_yaml_file
from kpops.utils.types import JsonType
from kpops.utils.yaml import load_yaml_file, substitute_nested

try:
from typing import Self
Expand Down Expand Up @@ -54,7 +60,7 @@ class BaseDefaultsComponent(DescConfigModel, ABC):
)

enrich: SkipJsonSchema[bool] = Field(
default=False,
default=True,
description=describe_attr("enrich", __doc__),
exclude=True,
)
Expand All @@ -70,21 +76,33 @@ class BaseDefaultsComponent(DescConfigModel, ABC):
)
validate_: SkipJsonSchema[bool] = Field(
validation_alias=AliasChoices("validate", "validate_"),
default=True,
default=False,
description=describe_attr("validate", __doc__),
exclude=True,
)

@pydantic.model_validator(mode="before")
@classmethod
def enrich_component(cls, values: dict[str, Any]) -> dict[str, Any]:
def __init__(self, **values: Any) -> None:
if values.get("enrich", True):
cls = self.__class__
values = cls.extend_with_defaults(**values)
return values
tmp_self = cls(**values, enrich=False)
values = tmp_self.model_dump(mode="json", by_alias=True)
values = cls.substitute_in_component(tmp_self.config, **values)
# HACK: why is double substitution necessary for test_substitute_in_component
values = cls.substitute_in_component(tmp_self.config, **values)
self.__init__(
enrich=False,
validate=True,
config=tmp_self.config,
handlers=tmp_self.handlers,
**values,
)
else:
super().__init__(**values)

@pydantic.model_validator(mode="after")
def validate_component(self) -> Self:
if self.validate_:
if not self.enrich and self.validate_:
self._validate_custom()
return self

Expand Down Expand Up @@ -113,6 +131,42 @@ def gen_parents():

return tuple(gen_parents())

@classmethod
def substitute_in_component(
cls, config: KpopsConfig, **component_data: Any
) -> dict[str, Any]:
"""Substitute all $-placeholders in a component in dict representation.

:param component_as_dict: Component represented as dict
:return: Updated component
"""
# Leftover variables that were previously introduced in the component by the substitution
# functions, still hardcoded, because of their names.
# TODO(Ivan Yordanov): Get rid of them
substitution_hardcoded: dict[str, JsonType] = {
"error_topic_name": config.topic_name_config.default_error_topic_name,
"output_topic_name": config.topic_name_config.default_output_topic_name,
}
component_substitution = generate_substitution(
component_data,
"component",
substitution_hardcoded,
separator=".",
)
substitution = generate_substitution(
config.model_dump(mode="json"),
"config",
existing_substitution=component_substitution,
separator=".",
)

return json.loads(
substitute_nested(
json.dumps(component_data),
**update_nested_pair(substitution, ENV),
)
)

@classmethod
def extend_with_defaults(cls, config: KpopsConfig, **kwargs: Any) -> dict[str, Any]:
"""Merge parent components' defaults with own.
Expand Down
8 changes: 6 additions & 2 deletions kpops/components/base_components/kafka_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from abc import ABC

from pydantic import ConfigDict, Field
from pydantic import AliasChoices, ConfigDict, Field
from typing_extensions import override

from kpops.component_handlers.helm_wrapper.model import (
Expand All @@ -28,7 +28,11 @@ class KafkaStreamsConfig(CamelCaseConfigModel, DescConfigModel):

brokers: str = Field(default=..., description=describe_attr("brokers", __doc__))
schema_registry_url: str | None = Field(
default=None, description=describe_attr("schema_registry_url", __doc__)
default=None,
validation_alias=AliasChoices(
"schema_registry_url", "schemaRegistryUrl"
), # TODO: same for other camelcase fields, avoids duplicates during enrichment
description=describe_attr("schema_registry_url", __doc__),
)

model_config = ConfigDict(
Expand Down
11 changes: 10 additions & 1 deletion kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ def connector_config_should_have_component_name(
app["name"] = component_name
return KafkaConnectorConfig(**app)

@computed_field
@cached_property
def _resetter(self) -> KafkaConnectorResetter:
kwargs: dict[str, Any] = {}
Expand Down Expand Up @@ -218,6 +217,11 @@ class KafkaSourceConnector(KafkaConnector):

_connector_type: KafkaConnectorType = PrivateAttr(KafkaConnectorType.SOURCE)

@computed_field
@cached_property
def _resetter(self) -> KafkaConnectorResetter:
return super()._resetter

@override
def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:
msg = "Kafka source connector doesn't support FromSection"
Expand All @@ -240,6 +244,11 @@ class KafkaSinkConnector(KafkaConnector):

_connector_type: KafkaConnectorType = PrivateAttr(KafkaConnectorType.SINK)

@computed_field
@cached_property
def _resetter(self) -> KafkaConnectorResetter:
return super()._resetter

@property
@override
def input_topics(self) -> list[str]:
Expand Down
5 changes: 3 additions & 2 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import cached_property

from pydantic import Field
from pydantic import Field, computed_field
from typing_extensions import override

from kpops.components.base_components.kafka_app import (
Expand Down Expand Up @@ -51,12 +51,13 @@ class ProducerApp(KafkaApp, StreamsBootstrap):
description=describe_attr("from_", __doc__),
)

@computed_field
@cached_property
def _cleaner(self) -> ProducerAppCleaner:
return ProducerAppCleaner(
config=self.config,
handlers=self.handlers,
**self.model_dump(),
**self.model_dump(exclude={"_cleaner"}),
)

@override
Expand Down
5 changes: 3 additions & 2 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import cached_property

from pydantic import Field
from pydantic import Field, computed_field
from typing_extensions import override

from kpops.components.base_components.kafka_app import (
Expand Down Expand Up @@ -33,12 +33,13 @@ class StreamsApp(KafkaApp, StreamsBootstrap):
description=describe_attr("app", __doc__),
)

@computed_field
@cached_property
def _cleaner(self) -> StreamsAppCleaner:
return StreamsAppCleaner(
config=self.config,
handlers=self.handlers,
**self.model_dump(),
**self.model_dump(exclude={"_cleaner"}),
)

@property
Expand Down
Loading
Loading