Skip to content

Commit

Permalink
Use pydantic.RootModel for PipelineComponents
Browse files Browse the repository at this point in the history
Fixes #381
  • Loading branch information
disrupted committed Dec 7, 2023
1 parent 10d7b54 commit 6582e34
Show file tree
Hide file tree
Showing 4 changed files with 2,493 additions and 2,531 deletions.
20 changes: 10 additions & 10 deletions kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import TYPE_CHECKING

import yaml
from pydantic import BaseModel, SerializeAsAny
from pydantic import RootModel, SerializeAsAny
from rich.console import Console
from rich.syntax import Syntax

Expand Down Expand Up @@ -35,37 +35,37 @@ class ValidationError(Exception):
pass


class PipelineComponents(BaseModel):
class PipelineComponents(RootModel):
"""Stores the pipeline components."""

components: list[SerializeAsAny[PipelineComponent]] = []
root: list[SerializeAsAny[PipelineComponent]] = []

@property
def last(self) -> PipelineComponent:
return self.components[-1]
return self.root[-1]

def find(self, component_name: str) -> PipelineComponent:
for component in self.components:
for component in self.root:
if component_name == component.name:
return component
msg = f"Component {component_name} not found"
raise ValueError(msg)

def add(self, component: PipelineComponent) -> None:
self._populate_component_name(component)
self.components.append(component)
self.root.append(component)

def __bool__(self) -> bool:
return bool(self.components)
return bool(self.root)

def __iter__(self) -> Iterator[PipelineComponent]:
return iter(self.components)
return iter(self.root)

def __len__(self) -> int:
return len(self.components)
return len(self.root)

def validate_unique_names(self) -> None:
step_names = [component.full_name for component in self.components]
step_names = [component.full_name for component in self.root]
duplicates = [name for name, count in Counter(step_names).items() if count > 1]
if duplicates:
msg = f"step names should be unique. duplicate step names: {', '.join(duplicates)}"
Expand Down
Loading

0 comments on commit 6582e34

Please sign in to comment.