diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 001da2146..39eb0de59 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -351,6 +351,11 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None: class KafkaSinkConnector(KafkaConnector): """Kafka sink connector model""" + @override + def get_input_topics(self) -> list[str]: + topics = getattr(self.app, "topics", None) + return topics.split(",") if topics is not None else [] + @override def add_input_topics(self, topics: list[str]) -> None: existing_topics: str | None = getattr(self.app, "topics", None) diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index 0e0ee2a49..93096255a 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -1,6 +1,7 @@ from __future__ import annotations from abc import ABC +from collections.abc import Iterator from pydantic import Extra, Field @@ -58,6 +59,25 @@ def __init__(self, **kwargs) -> None: self.set_input_topics() self.set_output_topics() + def get_input_topics(self) -> list[str]: + """Get all the input topics from config.""" + return [] + + def get_extra_input_topics(self) -> dict[str, list[str]]: + """Get extra input topics list from config.""" + return {} + + def get_output_topic(self) -> str | None: + """Get output topic from config.""" + + def get_extra_output_topics(self) -> dict[str, str]: + """Get extra output topics list from config.""" + return {} + + @property + def id(self) -> str: + return f"component-{self.full_name}" + @property def full_name(self) -> str: return self.prefix + self.name @@ -116,6 +136,18 @@ def set_input_topics(self) -> None: for name, topic in self.from_.topics.items(): self.apply_from_inputs(name, topic) + @property + def inputs(self) -> Iterator[str]: + yield from self.get_input_topics() + for role_topics in self.get_extra_input_topics().values(): + yield from role_topics + + @property + def outputs(self) -> Iterator[str]: + if output_topic := self.get_output_topic(): + yield output_topic + yield from self.get_extra_output_topics().values() + def apply_from_inputs(self, name: str, topic: FromTopic) -> None: """Add a `from` section input to the component config diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index cdf03516d..e94318286 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -44,6 +44,14 @@ def apply_to_outputs(self, name: str, topic: TopicConfig) -> None: case _: super().apply_to_outputs(name, topic) + @override + def get_output_topic(self) -> str | None: + return self.app.streams.output_topic + + @override + def get_extra_output_topics(self) -> dict[str, str]: + return self.app.streams.extra_output_topics + @override def set_output_topic(self, topic_name: str) -> None: self.app.streams.output_topic = topic_name diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index d282c33e6..ba0c1ee39 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -20,6 +20,22 @@ class StreamsApp(KafkaApp): description=describe_attr("app", __doc__), ) + @override + def get_input_topics(self) -> list[str]: + return self.app.streams.input_topics + + @override + def get_extra_input_topics(self) -> dict[str, list[str]]: + return self.app.streams.extra_input_topics + + @override + def get_output_topic(self) -> str | None: + return self.app.streams.output_topic + + @override + def get_extra_output_topics(self) -> dict[str, str]: + return self.app.streams.extra_output_topics + @override def add_input_topics(self, topics: list[str]) -> None: self.app.streams.add_input_topics(topics) diff --git a/kpops/pipeline_generator/pipeline.py b/kpops/pipeline_generator/pipeline.py index 093a452ea..e23a8c05f 100644 --- a/kpops/pipeline_generator/pipeline.py +++ b/kpops/pipeline_generator/pipeline.py @@ -7,8 +7,9 @@ from contextlib import suppress from pathlib import Path +import networkx as nx import yaml -from pydantic import BaseModel +from pydantic import BaseModel, Field from rich.console import Console from rich.syntax import Syntax @@ -35,6 +36,10 @@ class PipelineComponents(BaseModel): """Stores the pipeline components""" components: list[PipelineComponent] = [] + graph: nx.DiGraph = Field(default=nx.DiGraph(), exclude=True) + + class Config: + arbitrary_types_allowed = True @property def last(self) -> PipelineComponent: @@ -59,6 +64,10 @@ def __iter__(self) -> Iterator[PipelineComponent]: def __len__(self) -> int: return len(self.components) + def validate_graph(self) -> None: + if not nx.is_directed_acyclic_graph(self.graph): + raise ValueError("Pipeline is not a valid DAG.") + def validate_unique_names(self) -> None: step_names = [component.full_name for component in self.components] duplicates = [name for name, count in Counter(step_names).items() if count > 1] @@ -67,6 +76,24 @@ def validate_unique_names(self) -> None: f"step names should be unique. duplicate step names: {', '.join(duplicates)}" ) + def generate_graph(self) -> None: + for component in self.components: + self.graph.add_node(component.id) + + for input_topic in component.inputs: + self.__add_input(input_topic, component.id) + + for output_topic in component.outputs: + self.__add_output(output_topic, component.id) + + def __add_output(self, output_topic: str, source: str) -> None: + self.graph.add_node(output_topic) + self.graph.add_edge(source, output_topic) + + def __add_input(self, input_topic: str, component_node_name: str) -> None: + self.graph.add_node(input_topic) + self.graph.add_edge(input_topic, component_node_name) + @staticmethod def _populate_component_name(component: PipelineComponent) -> None: # TODO: remove with suppress( @@ -109,6 +136,7 @@ def __init__( self.registry = registry self.env_components_index = create_env_components_index(environment_components) self.parse_components(component_list) + self.components.generate_graph() self.validate() @classmethod @@ -308,6 +336,7 @@ def substitute_in_component(self, component_as_dict: dict) -> dict: def validate(self) -> None: self.components.validate_unique_names() + self.components.validate_graph() @staticmethod def pipeline_filename_environment(path: Path, config: PipelineConfig) -> Path: diff --git a/poetry.lock b/poetry.lock index a673ba0c0..e28916dfc 100644 --- a/poetry.lock +++ b/poetry.lock @@ -755,6 +755,24 @@ files = [ {file = "mypy_extensions-0.4.3.tar.gz", hash = "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"}, ] +[[package]] +name = "networkx" +version = "3.1" +description = "Python package for creating and manipulating graphs and networks" +optional = false +python-versions = ">=3.8" +files = [ + {file = "networkx-3.1-py3-none-any.whl", hash = "sha256:4f33f68cb2afcf86f28a45f43efc27a9386b535d567d2127f8f61d51dec58d36"}, + {file = "networkx-3.1.tar.gz", hash = "sha256:de346335408f84de0eada6ff9fafafff9bcda11f0a0dfaa931133debb146ab61"}, +] + +[package.extras] +default = ["matplotlib (>=3.4)", "numpy (>=1.20)", "pandas (>=1.3)", "scipy (>=1.8)"] +developer = ["mypy (>=1.1)", "pre-commit (>=3.2)"] +doc = ["nb2plots (>=0.6)", "numpydoc (>=1.5)", "pillow (>=9.4)", "pydata-sphinx-theme (>=0.13)", "sphinx (>=6.1)", "sphinx-gallery (>=0.12)", "texext (>=0.6.7)"] +extra = ["lxml (>=4.6)", "pydot (>=1.4.2)", "pygraphviz (>=1.10)", "sympy (>=1.10)"] +test = ["codecov (>=2.1)", "pytest (>=7.2)", "pytest-cov (>=4.0)"] + [[package]] name = "nodeenv" version = "1.7.0" @@ -1827,4 +1845,4 @@ watchmedo = ["PyYAML (>=3.10)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "62d203076f7ac783793b7aa4ad6b5263067e6b9d6d13a6965dddef1a62c40eed" +content-hash = "d359a28891312de394390134357843b05abf37d8299b7ebcd8b91b76bd3aa880" diff --git a/pyproject.toml b/pyproject.toml index 3112986f0..4fbc1d769 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ cachetools = "^5.2.0" dictdiffer = "^0.9.0" python-schema-registry-client = "^2.4.1" httpx = "^0.24.1" +networkx = "^3.1" [tool.poetry.group.dev.dependencies] diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 2789798eb..4b7d2d560 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -247,3 +247,37 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea False, ), ] + + def test_get_output_topics( + self, + config: PipelineConfig, + handlers: ComponentHandlers, + ): + producer_app = ProducerApp( + name=self.PRODUCER_APP_NAME, + config=config, + handlers=handlers, + **{ + "namespace": "test-namespace", + "app": { + "namespace": "test-namespace", + "streams": {"brokers": "fake-broker:9092"}, + }, + "to": { + "topics": { + "${output_topic_name}": TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + "extra-topic-1": TopicConfig( + role="first-extra-topic", + partitions_count=10, + ), + } + }, + }, + ) + assert producer_app.get_output_topic() == "${output_topic_name}" + assert producer_app.get_extra_output_topics() == { + "first-extra-topic": "extra-topic-1" + } + assert producer_app.get_input_topics() == [] diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 97e8f2ba8..81b8bef0e 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -443,3 +443,48 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean dry_run, ), ] + + @pytest.mark.asyncio + async def test_get_input_output_topics( + self, config: PipelineConfig, handlers: ComponentHandlers + ): + streams_app = StreamsApp( + name=self.STREAMS_APP_NAME, + config=config, + handlers=handlers, + **{ + "namespace": "test-namespace", + "app": { + "streams": {"brokers": "fake-broker:9092"}, + }, + "from": { + "topics": { + "example-input": {"type": "input"}, + "b": {"type": "input"}, + "a": {"type": "input"}, + "topic-extra2": {"role": "role2"}, + "topic-extra3": {"role": "role2"}, + "topic-extra": {"role": "role1"}, + ".*": {"type": "pattern"}, + "example.*": { + "type": "pattern", + "role": "another-pattern", + }, + } + }, + "to": { + "topics": { + "example-output": {"type": "output"}, + "extra-topic": {"role": "fake-role"}, + } + }, + }, + ) + + assert streams_app.get_input_topics() == ["example-input", "b", "a"] + assert streams_app.get_extra_input_topics() == { + "role1": ["topic-extra"], + "role2": ["topic-extra2", "topic-extra3"], + } + assert streams_app.get_output_topic() == "example-output" + assert streams_app.get_extra_output_topics() == {"fake-role": "extra-topic"} diff --git a/tests/pipeline/resources/pipeline-with-loop/defaults.yaml b/tests/pipeline/resources/pipeline-with-loop/defaults.yaml new file mode 100644 index 000000000..a51f34a2d --- /dev/null +++ b/tests/pipeline/resources/pipeline-with-loop/defaults.yaml @@ -0,0 +1,19 @@ +pipeline-component: + prefix: "" + +kubernetes-app: + namespace: example-namespace + +kafka-connector: + namespace: example-namespace + +kafka-app: + app: + streams: + brokers: 127.0.0.1:9092 + schemaRegistryUrl: 127.0.0.1:8081 + +streams-app: + app: + labels: + pipeline: ${pipeline_name} diff --git a/tests/pipeline/resources/pipeline-with-loop/pipeline.yaml b/tests/pipeline/resources/pipeline-with-loop/pipeline.yaml new file mode 100644 index 000000000..b8f2866f6 --- /dev/null +++ b/tests/pipeline/resources/pipeline-with-loop/pipeline.yaml @@ -0,0 +1,34 @@ +- type: producer-app + name: app1 + app: + image: producer-image + to: + topics: + my-output-topic: + type: output + +- type: streams-app + name: app2 + app: + image: app2-image + from: + topics: + my-output-topic: + type: input + to: + topics: + my-app2-topic: + type: output + +- type: streams-app + name: app3 + app: + image: app3-image + from: + topics: + my-app2-topic: + type: input + to: + topics: + my-output-topic: + type: output diff --git a/tests/pipeline/resources/pipeline-with-short-topics/defaults.yaml b/tests/pipeline/resources/pipeline-with-short-topics/defaults.yaml index 00b3b2673..8138a83a0 100644 --- a/tests/pipeline/resources/pipeline-with-short-topics/defaults.yaml +++ b/tests/pipeline/resources/pipeline-with-short-topics/defaults.yaml @@ -9,6 +9,13 @@ kafka-app: schema_registry_url: "${schema_registry_url}" version: "2.4.2" +producer-app: + to: + topics: + ${output_topic_name}: + partitions_count: 3 + + streams-app: # inherits from kafka-app app: streams: @@ -19,7 +26,7 @@ streams-app: # inherits from kafka-app type: error-topic: type: error - extra-topic: + extra-topic-output: role: role from: topics: diff --git a/tests/pipeline/resources/pipelines-with-graphs/same-topic-and-component-name/config.yaml b/tests/pipeline/resources/pipelines-with-graphs/same-topic-and-component-name/config.yaml new file mode 100644 index 000000000..5c19b0e89 --- /dev/null +++ b/tests/pipeline/resources/pipelines-with-graphs/same-topic-and-component-name/config.yaml @@ -0,0 +1,8 @@ +environment: development +defaults_path: .. +brokers: "broker:9092" +helm_diff_config: + enable: false +kafka_connect_host: "kafka_connect_host:8083" +kafka_rest_host: "kafka_rest_host:8082" + diff --git a/tests/pipeline/resources/pipelines-with-graphs/same-topic-and-component-name/defaults.yaml b/tests/pipeline/resources/pipelines-with-graphs/same-topic-and-component-name/defaults.yaml new file mode 100644 index 000000000..d8144f063 --- /dev/null +++ b/tests/pipeline/resources/pipelines-with-graphs/same-topic-and-component-name/defaults.yaml @@ -0,0 +1,27 @@ +pipeline-component: + prefix: "" + +kubernetes-app: + namespace: example-namespace + +kafka-connector: + namespace: example-namespace + +kafka-app: + app: + streams: + brokers: 127.0.0.1:9092 + schemaRegistryUrl: 127.0.0.1:8081 + +streams-app: + app: + labels: + pipeline: ${pipeline_name} + +producer: + to: + topics: + ${output_topic_name}: + type: output + configs: + cleanup.policy: compact,delete diff --git a/tests/pipeline/resources/pipelines-with-graphs/same-topic-and-component-name/pipeline.yaml b/tests/pipeline/resources/pipelines-with-graphs/same-topic-and-component-name/pipeline.yaml new file mode 100644 index 000000000..5e578f0a2 --- /dev/null +++ b/tests/pipeline/resources/pipelines-with-graphs/same-topic-and-component-name/pipeline.yaml @@ -0,0 +1,8 @@ +- type: streams-app + name: app2-processor + app: + image: some-image + to: + topics: + app2-processor: + type: output diff --git a/tests/pipeline/resources/pipelines-with-graphs/simple-pipeline/config.yaml b/tests/pipeline/resources/pipelines-with-graphs/simple-pipeline/config.yaml new file mode 100644 index 000000000..55f990f7b --- /dev/null +++ b/tests/pipeline/resources/pipelines-with-graphs/simple-pipeline/config.yaml @@ -0,0 +1,7 @@ +environment: development +defaults_path: .. +brokers: "broker:9092" +helm_diff_config: + enable: false +kafka_connect_host: "kafka_connect_host:8083" +kafka_rest_host: "kafka_rest_host:8082" diff --git a/tests/pipeline/resources/pipelines-with-graphs/simple-pipeline/defaults.yaml b/tests/pipeline/resources/pipelines-with-graphs/simple-pipeline/defaults.yaml new file mode 100644 index 000000000..fbd7623d1 --- /dev/null +++ b/tests/pipeline/resources/pipelines-with-graphs/simple-pipeline/defaults.yaml @@ -0,0 +1,28 @@ +pipeline-component: + prefix: "" + +kubernetes-app: + namespace: example-namespace + +kafka-connector: + namespace: example-namespace + +kafka-app: + app: + streams: + brokers: 127.0.0.1:9092 + schemaRegistryUrl: 127.0.0.1:8081 + + +streams-app: + app: + labels: + pipeline: ${pipeline_name} + +producer-app: + to: + topics: + ${output_topic_name}: + type: output + configs: + cleanup.policy: compact,delete diff --git a/tests/pipeline/resources/pipelines-with-graphs/simple-pipeline/pipeline.yaml b/tests/pipeline/resources/pipelines-with-graphs/simple-pipeline/pipeline.yaml new file mode 100644 index 000000000..25ddbedb9 --- /dev/null +++ b/tests/pipeline/resources/pipelines-with-graphs/simple-pipeline/pipeline.yaml @@ -0,0 +1,13 @@ +- type: producer-app + name: app1 + app: + resources: + limits: + memory: 2G + requests: + memory: 2G + +- type: streams-app + name: app2 + app: + image: some-image diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index af9cde479..d60d597cd 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -191,7 +191,7 @@ def test_substitute_in_component(self, snapshot: SnapshotTest): snapshot.assert_match(enriched_pipeline, "test-pipeline") - @pytest.mark.timeout(0.5) + @pytest.mark.timeout(2) def test_substitute_in_component_infinite_loop(self): with pytest.raises((ValueError, ParsingException)): runner.invoke( @@ -528,10 +528,10 @@ def test_short_topic_definition(self): input_components = enriched_pipeline["components"][4]["from"]["components"] assert "type" not in output_topics["output-topic"] assert output_topics["error-topic"]["type"] == "error" - assert "type" not in output_topics["extra-topic"] + assert "type" not in output_topics["extra-topic-output"] assert "role" not in output_topics["output-topic"] assert "role" not in output_topics["error-topic"] - assert output_topics["extra-topic"]["role"] == "role" + assert output_topics["extra-topic-output"]["role"] == "role" assert "type" not in ["input-topic"] assert "type" not in input_topics["extra-topic"] @@ -587,3 +587,48 @@ def test_validate_unique_step_names(self): ], catch_exceptions=False, ) + + def test_validate_loops_on_pipeline(self): + with pytest.raises(ValueError, match="Pipeline is not a valid DAG."): + runner.invoke( + app, + [ + "generate", + "--pipeline-base-dir", + str(PIPELINE_BASE_DIR_PATH), + str(RESOURCE_PATH / "pipeline-with-loop/pipeline.yaml"), + "--defaults", + str(RESOURCE_PATH / "pipeline-with-loop"), + ], + catch_exceptions=False, + ) + + def test_validate_simple_graph(self): + pipeline = kpops.generate( + RESOURCE_PATH / "pipelines-with-graphs/simple-pipeline/pipeline.yaml", + pipeline_base_dir=PIPELINE_BASE_DIR_PATH, + defaults=RESOURCE_PATH / "pipelines-with-graphs" / "simple-pipeline", + ) + assert len(pipeline.components) == 2 + assert len(pipeline.components.graph.nodes) == 3 + assert len(pipeline.components.graph.edges) == 2 + node_components = list( + filter( + lambda node_id: "component" in node_id, pipeline.components.graph.nodes + ) + ) + assert len(pipeline.components) == len(node_components) + + def test_validate_topic_and_component_same_name(self): + pipeline = kpops.generate( + RESOURCE_PATH + / "pipelines-with-graphs/same-topic-and-component-name/pipeline.yaml", + pipeline_base_dir=PIPELINE_BASE_DIR_PATH, + defaults=RESOURCE_PATH + / "pipelines-with-graphs" + / "same-topic-and-component-name", + ) + component, topic = list(pipeline.components.graph.nodes) + edges = list(pipeline.components.graph.edges) + assert component == f"component-{topic}" + assert (component, topic) in edges