Skip to content

Commit

Permalink
Refactor pipeline generator & representation (#392)
Browse files Browse the repository at this point in the history
Fixes #381
  • Loading branch information
disrupted authored Dec 20, 2023
1 parent dac1cc9 commit f95afe2
Show file tree
Hide file tree
Showing 15 changed files with 2,593 additions and 2,638 deletions.
2 changes: 1 addition & 1 deletion hooks/gen_docs/gen_docs_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kpops.cli.registry import _find_classes
from kpops.components import KafkaConnector, PipelineComponent
from kpops.utils.colorify import redify, yellowify
from kpops.utils.yaml_loading import load_yaml_file
from kpops.utils.yaml import load_yaml_file

PATH_KPOPS_MAIN = ROOT / "kpops/cli/main.py"
PATH_CLI_COMMANDS_DOC = ROOT / "docs/docs/user/references/cli-commands.md"
Expand Down
10 changes: 5 additions & 5 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
from kpops.component_handlers.topic.handler import TopicHandler
from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper
from kpops.config import ENV_PREFIX, KpopsConfig
from kpops.pipeline_generator.pipeline import Pipeline
from kpops.pipeline import Pipeline, PipelineGenerator
from kpops.utils.gen_schema import SchemaScope, gen_config_schema, gen_pipeline_schema
from kpops.utils.pydantic import YamlConfigSettingsSource
from kpops.utils.yaml import print_yaml

if TYPE_CHECKING:
from collections.abc import Iterator
Expand Down Expand Up @@ -144,9 +145,8 @@ def setup_pipeline(
registry.find_components("kpops.components")

handlers = setup_handlers(components_module, kpops_config)
return Pipeline.load_from_yaml(
pipeline_base_dir, pipeline_path, environment, registry, kpops_config, handlers
)
parser = PipelineGenerator(kpops_config, registry, handlers)
return parser.load_yaml(pipeline_base_dir, pipeline_path, environment)


def setup_handlers(
Expand Down Expand Up @@ -288,7 +288,7 @@ def generate(
)

if not template:
pipeline.print_yaml()
print_yaml(pipeline.to_yaml())

if template:
steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from kpops.utils.docstring import describe_attr
from kpops.utils.environment import ENV
from kpops.utils.pydantic import DescConfigModel, to_dash
from kpops.utils.yaml_loading import load_yaml_file
from kpops.utils.yaml import load_yaml_file

try:
from typing import Self
Expand Down
140 changes: 58 additions & 82 deletions kpops/pipeline_generator/pipeline.py → kpops/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@
import logging
from collections import Counter
from contextlib import suppress
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

import yaml
from pydantic import BaseModel, SerializeAsAny
from rich.console import Console
from rich.syntax import Syntax
from pydantic import Field, RootModel, SerializeAsAny

from kpops.components.base_components.pipeline_component import PipelineComponent
from kpops.utils.dict_ops import generate_substitution, update_nested_pair
from kpops.utils.environment import ENV
from kpops.utils.yaml_loading import load_yaml_file, substitute, substitute_nested
from kpops.utils.yaml import load_yaml_file, substitute_nested

if TYPE_CHECKING:
from collections.abc import Iterator
Expand All @@ -35,37 +34,45 @@ class ValidationError(Exception):
pass


class PipelineComponents(BaseModel):
"""Stores the pipeline components."""
class Pipeline(RootModel):
"""Pipeline representation."""

components: list[SerializeAsAny[PipelineComponent]] = []
root: list[SerializeAsAny[PipelineComponent]] = Field(
default=[], title="Components"
)

@property
def last(self) -> PipelineComponent:
return self.components[-1]
return self.root[-1]

def find(self, component_name: str) -> PipelineComponent:
for component in self.components:
for component in self.root:
if component_name == component.name:
return component
msg = f"Component {component_name} not found"
raise ValueError(msg)

def add(self, component: PipelineComponent) -> None:
self._populate_component_name(component)
self.components.append(component)
self.root.append(component)

def __bool__(self) -> bool:
return bool(self.components)
return bool(self.root)

def __iter__(self) -> Iterator[PipelineComponent]:
return iter(self.components)
return iter(self.root)

def __len__(self) -> int:
return len(self.components)
return len(self.root)

def to_yaml(self) -> str:
return yaml.dump(self.model_dump(mode="json", by_alias=True, exclude_none=True))

def validate(self) -> None:
self.validate_unique_names()

def validate_unique_names(self) -> None:
step_names = [component.full_name for component in self.components]
step_names = [component.full_name for component in self.root]
duplicates = [name for name, count in Counter(step_names).items() if count > 1]
if duplicates:
msg = f"step names should be unique. duplicate step names: {', '.join(duplicates)}"
Expand Down Expand Up @@ -97,48 +104,44 @@ def create_env_components_index(
return index


class Pipeline:
def __init__(
@dataclass
class PipelineGenerator:
config: KpopsConfig
registry: Registry
handlers: ComponentHandlers
pipeline: Pipeline = field(init=False, default_factory=Pipeline)

def parse(
self,
component_list: list[dict],
components: list[dict],
environment_components: list[dict],
registry: Registry,
config: KpopsConfig,
handlers: ComponentHandlers,
) -> None:
self.components: PipelineComponents = PipelineComponents()
self.handlers = handlers
self.config = config
self.registry = registry
) -> Pipeline:
"""Parse pipeline from sequence of component dictionaries.
:param components: List of components
:param environment_components: List of environment-specific components
:returns: Initialized pipeline object
"""
self.env_components_index = create_env_components_index(environment_components)
self.parse_components(component_list)
self.validate()

@classmethod
def load_from_yaml(
cls,
base_dir: Path,
path: Path,
environment: str | None,
registry: Registry,
config: KpopsConfig,
handlers: ComponentHandlers,
self.parse_components(components)
self.pipeline.validate()
return self.pipeline

def load_yaml(
self, base_dir: Path, path: Path, environment: str | None
) -> Pipeline:
"""Load pipeline definition from yaml.
The file is often named ``pipeline.yaml``
:param base_dir: Base directory to the pipelines (default is current working directory)
:param path: Path to pipeline definition yaml file
:param registry: Pipeline components registry
:param config: Pipeline config
:param handlers: Component handlers
:raises TypeError: The pipeline definition should contain a list of components
:raises TypeError: The env-specific pipeline definition should contain a list of components
:returns: Initialized pipeline object
"""
Pipeline.set_pipeline_name_env_vars(base_dir, path)
Pipeline.set_environment_name(environment)
PipelineGenerator.set_pipeline_name_env_vars(base_dir, path)
PipelineGenerator.set_environment_name(environment)

main_content = load_yaml_file(path, substitution=ENV)
if not isinstance(main_content, list):
Expand All @@ -148,25 +151,27 @@ def load_from_yaml(
if (
environment
and (
env_file := Pipeline.pipeline_filename_environment(path, environment)
env_file := PipelineGenerator.pipeline_filename_environment(
path, environment
)
).exists()
):
env_content = load_yaml_file(env_file, substitution=ENV)
if not isinstance(env_content, list):
msg = f"The pipeline definition {env_file} should contain a list of components"
raise TypeError(msg)

return cls(main_content, env_content, registry, config, handlers)
return self.parse(main_content, env_content)

def parse_components(self, component_list: list[dict]) -> None:
def parse_components(self, components: list[dict]) -> None:
"""Instantiate, enrich and inflate a list of components.
:param component_list: List of components
:param components: List of components
:raises ValueError: Every component must have a type defined
:raises ParsingException: Error enriching component
:raises ParsingException: All undefined exceptions
"""
for component_data in component_list:
for component_data in components:
try:
try:
component_type: str = component_data["type"]
Expand Down Expand Up @@ -208,21 +213,21 @@ def apply_component(
original_from_component_name,
from_topic,
) in enriched_component.from_.components.items():
original_from_component = self.components.find(
original_from_component = self.pipeline.find(
original_from_component_name
)
inflated_from_component = original_from_component.inflate()[-1]
resolved_from_component = self.components.find(
resolved_from_component = self.pipeline.find(
inflated_from_component.name
)
enriched_component.weave_from_topics(
resolved_from_component.to, from_topic
)
elif self.components:
elif self.pipeline:
# read from previous component
prev_component = self.components.last
prev_component = self.pipeline.last
enriched_component.weave_from_topics(prev_component.to)
self.components.add(enriched_component)
self.pipeline.add(enriched_component)

def enrich_component(
self,
Expand Down Expand Up @@ -251,32 +256,6 @@ def enrich_component(
**component_data,
)

def print_yaml(self, substitution: dict | None = None) -> None:
"""Print the generated pipeline definition.
:param substitution: Substitution dictionary, defaults to None
"""
syntax = Syntax(
substitute(str(self), substitution),
"yaml",
background_color="default",
theme="ansi_dark",
)
Console(
width=1000 # HACK: overwrite console width to avoid truncating output
).print(syntax)

def __iter__(self) -> Iterator[PipelineComponent]:
return iter(self.components)

def __str__(self) -> str:
return yaml.dump(
self.components.model_dump(mode="json", by_alias=True, exclude_none=True)
)

def __len__(self) -> int:
return len(self.components)

def substitute_in_component(self, component_as_dict: dict) -> dict:
"""Substitute all $-placeholders in a component in dict representation.
Expand Down Expand Up @@ -310,15 +289,12 @@ def substitute_in_component(self, component_as_dict: dict) -> dict:
)
)

def validate(self) -> None:
self.components.validate_unique_names()

@staticmethod
def pipeline_filename_environment(pipeline_path: Path, environment: str) -> Path:
"""Add the environment name from the KpopsConfig to the pipeline.yaml path.
:param pipeline_path: Path to pipeline.yaml file
:param config: The KpopsConfig
:param environment: Environment name
:returns: An absolute path to the pipeline_<environment>.yaml
"""
return pipeline_path.with_stem(f"{pipeline_path.stem}_{environment}")
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion kpops/utils/pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from kpops.utils.dict_ops import update_nested_pair
from kpops.utils.docstring import describe_object
from kpops.utils.yaml_loading import load_yaml_file
from kpops.utils.yaml import load_yaml_file


def to_camel(s: str) -> str:
Expand Down
19 changes: 19 additions & 0 deletions kpops/utils/yaml_loading.py → kpops/utils/yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import yaml
from cachetools import cached
from cachetools.keys import hashkey
from rich.console import Console
from rich.syntax import Syntax

from kpops.utils.dict_ops import ImprovedTemplate

Expand Down Expand Up @@ -79,3 +81,20 @@ def substitute_nested(input: str, **kwargs) -> str:
msg = "An infinite loop condition detected. Check substitution variables."
raise ValueError(msg)
return old_str


def print_yaml(input: str, *, substitution: dict | None = None) -> None:
"""Print YAML to console with syntax highlighting.
:param s: YAML content
:param substitution: Substitution dictionary, defaults to None
"""
syntax = Syntax(
substitute(input, substitution),
"yaml",
background_color="default",
theme="ansi_dark",
)
Console(
width=1000 # HACK: overwrite console width to avoid truncating output
).print(syntax)
19 changes: 7 additions & 12 deletions tests/cli/test_pipeline_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from pytest_mock import MockerFixture

from kpops.cli.main import FilterType, get_steps_to_apply
from kpops.pipeline_generator.pipeline import Pipeline
from kpops.components import PipelineComponent
from kpops.pipeline import Pipeline

PREFIX = "example-prefix-"

Expand All @@ -25,17 +26,11 @@ class TestComponent:

@pytest.fixture(autouse=True)
def pipeline() -> Pipeline:
class TestPipeline:
components = [
test_component_1,
test_component_2,
test_component_3,
]

def __iter__(self):
return iter(self.components)

return cast(Pipeline, TestPipeline())
pipeline = Pipeline()
pipeline.add(cast(PipelineComponent, test_component_1))
pipeline.add(cast(PipelineComponent, test_component_2))
pipeline.add(cast(PipelineComponent, test_component_3))
return pipeline


@pytest.fixture(autouse=True)
Expand Down
Loading

0 comments on commit f95afe2

Please sign in to comment.