From bcd3865fc5af56b0aeb80deb08f276bd4fd511af Mon Sep 17 00:00:00 2001 From: Alejandro Jaramillo Date: Mon, 15 Jan 2024 17:24:11 +0100 Subject: [PATCH] Execute operations in parallel (#372) Closes #285 --------- Co-authored-by: Ivan Yordanov --- docs/docs/user/references/cli-commands.md | 4 + kpops/cli/main.py | 171 ++++++++++++------ kpops/component_handlers/helm_wrapper/helm.py | 23 ++- kpops/components/base_components/helm_app.py | 4 +- .../base_components/kafka_connector.py | 36 ++-- kpops/pipeline.py | 79 +++++++- poetry.lock | 42 ++++- pyproject.toml | 1 + tests/cli/test_pipeline_steps.py | 39 ++-- .../helm_wrapper/test_helm_wrapper.py | 61 ++++--- .../kafka_connect/test_connect_wrapper.py | 2 +- tests/components/test_helm_app.py | 3 +- tests/components/test_kafka_connector.py | 4 +- .../resources/parallel-pipeline/config.yaml | 15 ++ .../resources/parallel-pipeline/defaults.yaml | 27 +++ .../resources/parallel-pipeline/pipeline.yaml | 64 +++++++ tests/pipeline/test_generate.py | 121 +++++++++++++ 17 files changed, 572 insertions(+), 124 deletions(-) create mode 100644 tests/pipeline/resources/parallel-pipeline/config.yaml create mode 100644 tests/pipeline/resources/parallel-pipeline/defaults.yaml create mode 100644 tests/pipeline/resources/parallel-pipeline/pipeline.yaml diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 0a7617224..29188632e 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -47,6 +47,7 @@ $ kpops clean [OPTIONS] PIPELINE_PATH * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] +* `--parallel / --no-parallel`: Run the command in parallel [default: no-parallel] * `--help`: Show this message and exit. ## `kpops deploy` @@ -73,6 +74,7 @@ $ kpops deploy [OPTIONS] PIPELINE_PATH * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] +* `--parallel / --no-parallel`: Run the command in parallel [default: no-parallel] * `--help`: Show this message and exit. ## `kpops destroy` @@ -99,6 +101,7 @@ $ kpops destroy [OPTIONS] PIPELINE_PATH * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] +* `--parallel / --no-parallel`: Run the command in parallel [default: no-parallel] * `--help`: Show this message and exit. ## `kpops generate` @@ -175,6 +178,7 @@ $ kpops reset [OPTIONS] PIPELINE_PATH * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] * `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] +* `--parallel / --no-parallel`: Run the command in parallel [default: no-parallel] * `--help`: Show this message and exit. ## `kpops schema` diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 96384696f..98073ce2f 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -2,7 +2,6 @@ import asyncio import logging -from collections.abc import Iterator from enum import Enum from pathlib import Path from typing import TYPE_CHECKING, Optional @@ -33,6 +32,8 @@ from kpops.utils.yaml import print_yaml if TYPE_CHECKING: + from collections.abc import Awaitable, Callable, Coroutine, Iterator + from kpops.components.base_components import PipelineComponent @@ -94,6 +95,13 @@ ) +PARALLEL: bool = typer.Option( + False, + "--parallel/--no-parallel", + help="Run the command in parallel", +) + + class FilterType(str, Enum): INCLUDE = "include" EXCLUDE = "exclude" @@ -184,6 +192,26 @@ def is_in_steps(component: PipelineComponent) -> bool: return filtered_steps +def get_reverse_concurrently_tasks_to_execute( + pipeline: Pipeline, + steps: str | None, + filter_type: FilterType, + runner: Callable[[PipelineComponent], Coroutine], +) -> Awaitable: + steps_to_apply = reverse_pipeline_steps(pipeline, steps, filter_type) + return pipeline.build_execution_graph_from(list(steps_to_apply), True, runner) + + +def get_concurrently_tasks_to_execute( + pipeline: Pipeline, + steps: str | None, + filter_type: FilterType, + runner: Callable[[PipelineComponent], Coroutine], +) -> Awaitable: + steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) + return pipeline.build_execution_graph_from(steps_to_apply, False, runner) + + def get_steps_to_apply( pipeline: Pipeline, steps: str | None, filter_type: FilterType ) -> list[PipelineComponent]: @@ -284,6 +312,7 @@ def generate( environment, verbose, ) + pipeline = setup_pipeline(pipeline_path, kpops_config, environment) if output: print_yaml(pipeline.to_yaml()) @@ -336,22 +365,31 @@ def deploy( environment: Optional[str] = ENVIRONMENT, dry_run: bool = DRY_RUN, verbose: bool = VERBOSE_OPTION, + parallel: bool = PARALLEL, ): - kpops_config = create_kpops_config( - config, - defaults, - dotenv, - environment, - verbose, - ) - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - - steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) + async def deploy_runner(component: PipelineComponent): + log_action("Deploy", component) + await component.deploy(dry_run) async def async_deploy(): - for component in steps_to_apply: - log_action("Deploy", component) - await component.deploy(dry_run) + kpops_config = create_kpops_config( + config, + defaults, + dotenv, + environment, + verbose, + ) + pipeline = setup_pipeline(pipeline_path, kpops_config, environment) + + if parallel: + pipeline_tasks = get_concurrently_tasks_to_execute( + pipeline, steps, filter_type, deploy_runner + ) + await pipeline_tasks + else: + steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) + for component in steps_to_apply: + await deploy_runner(component) asyncio.run(async_deploy()) @@ -367,21 +405,32 @@ def destroy( environment: Optional[str] = ENVIRONMENT, dry_run: bool = DRY_RUN, verbose: bool = VERBOSE_OPTION, + parallel: bool = PARALLEL, ): - kpops_config = create_kpops_config( - config, - defaults, - dotenv, - environment, - verbose, - ) - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) + async def destroy_runner(component: PipelineComponent): + log_action("Destroy", component) + await component.destroy(dry_run) async def async_destroy(): - for component in pipeline_steps: - log_action("Destroy", component) - await component.destroy(dry_run) + kpops_config = create_kpops_config( + config, + defaults, + dotenv, + environment, + verbose, + ) + + pipeline = setup_pipeline(pipeline_path, kpops_config, environment) + + if parallel: + pipeline_tasks = get_reverse_concurrently_tasks_to_execute( + pipeline, steps, filter_type, destroy_runner + ) + await pipeline_tasks + else: + pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) + for component in pipeline_steps: + await destroy_runner(component) asyncio.run(async_destroy()) @@ -397,22 +446,31 @@ def reset( environment: Optional[str] = ENVIRONMENT, dry_run: bool = DRY_RUN, verbose: bool = VERBOSE_OPTION, + parallel: bool = PARALLEL, ): - kpops_config = create_kpops_config( - config, - defaults, - dotenv, - environment, - verbose, - ) - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) + async def reset_runner(component: PipelineComponent): + log_action("Reset", component) + await component.destroy(dry_run) + await component.reset(dry_run) async def async_reset(): - for component in pipeline_steps: - log_action("Reset", component) - await component.destroy(dry_run) - await component.reset(dry_run) + kpops_config = create_kpops_config( + config, + defaults, + dotenv, + environment, + verbose, + ) + pipeline = setup_pipeline(pipeline_path, kpops_config, environment) + if parallel: + pipeline_tasks = get_reverse_concurrently_tasks_to_execute( + pipeline, steps, filter_type, reset_runner + ) + await pipeline_tasks + else: + pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) + for component in pipeline_steps: + await reset_runner(component) asyncio.run(async_reset()) @@ -428,22 +486,31 @@ def clean( environment: Optional[str] = ENVIRONMENT, dry_run: bool = DRY_RUN, verbose: bool = VERBOSE_OPTION, + parallel: bool = PARALLEL, ): - kpops_config = create_kpops_config( - config, - defaults, - dotenv, - environment, - verbose, - ) - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) + async def clean_runner(component: PipelineComponent): + log_action("Clean", component) + await component.destroy(dry_run) + await component.clean(dry_run) async def async_clean(): - for component in pipeline_steps: - log_action("Clean", component) - await component.destroy(dry_run) - await component.clean(dry_run) + kpops_config = create_kpops_config( + config, + defaults, + dotenv, + environment, + verbose, + ) + pipeline = setup_pipeline(pipeline_path, kpops_config, environment) + if parallel: + pipeline_steps = get_reverse_concurrently_tasks_to_execute( + pipeline, steps, filter_type, clean_runner + ) + await pipeline_steps + else: + pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) + for component in pipeline_steps: + await clean_runner(component) asyncio.run(async_clean()) diff --git a/kpops/component_handlers/helm_wrapper/helm.py b/kpops/component_handlers/helm_wrapper/helm.py index 8499504ba..2f2c5dcf9 100644 --- a/kpops/component_handlers/helm_wrapper/helm.py +++ b/kpops/component_handlers/helm_wrapper/helm.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import logging import re import subprocess @@ -74,7 +75,7 @@ def add_repo( else: self.__execute(["helm", "repo", "update"]) - def upgrade_install( + async def upgrade_install( self, release_name: str, chart: str, @@ -103,9 +104,9 @@ def upgrade_install( command.extend(flags.to_command()) if dry_run: command.append("--dry-run") - return self.__execute(command) + return await self.__async_execute(command) - def uninstall( + async def uninstall( self, namespace: str, release_name: str, @@ -122,7 +123,7 @@ def uninstall( if dry_run: command.append("--dry-run") try: - return self.__execute(command) + return await self.__async_execute(command) except ReleaseNotFoundException: log.warning( f"Release with name {release_name} not found. Could not uninstall app." @@ -229,6 +230,20 @@ def __execute(self, command: list[str]) -> str: log.debug(process.stdout) return process.stdout + async def __async_execute(self, command: list[str]): + command = self.__set_global_flags(command) + log.debug(f"Executing {' '.join(command)}") + proc = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await proc.communicate() + Helm.parse_helm_command_stderr_output(stderr.decode()) + log.debug(stdout) + return stdout.decode() + def __set_global_flags(self, command: list[str]) -> list[str]: if self._context: log.debug(f"Changing the Kubernetes context to {self._context}") diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index af6935578..87d4ba546 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -152,7 +152,7 @@ def deploy_flags(self) -> HelmUpgradeInstallFlags: @override async def deploy(self, dry_run: bool) -> None: - stdout = self.helm.upgrade_install( + stdout = await self.helm.upgrade_install( self.helm_release_name, self.helm_chart, dry_run, @@ -165,7 +165,7 @@ async def deploy(self, dry_run: bool) -> None: @override async def destroy(self, dry_run: bool) -> None: - stdout = self.helm.uninstall( + stdout = await self.helm.uninstall( self.namespace, self.helm_release_name, dry_run, diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index fb8364aa4..b70cb35c7 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -164,7 +164,7 @@ async def clean(self, dry_run: bool) -> None: ) await self.handlers.topic_handler.delete_topics(self.to, dry_run=dry_run) - def _run_connect_resetter( + async def _run_connect_resetter( self, dry_run: bool, retain_clean_jobs: bool, @@ -185,7 +185,7 @@ def _run_connect_resetter( f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}" ) ) - self.__uninstall_connect_resetter(self._resetter_release_name, dry_run) + await self.__uninstall_connect_resetter(self._resetter_release_name, dry_run) log.info( magentaify( @@ -193,7 +193,7 @@ def _run_connect_resetter( ) ) - stdout = self.__install_connect_resetter(dry_run, **kwargs) + stdout = await self.__install_connect_resetter(dry_run, **kwargs) if dry_run: self.dry_run_handler.print_helm_diff( @@ -202,9 +202,11 @@ def _run_connect_resetter( if not retain_clean_jobs: log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")) - self.__uninstall_connect_resetter(self._resetter_release_name, dry_run) + await self.__uninstall_connect_resetter( + self._resetter_release_name, dry_run + ) - def __install_connect_resetter( + async def __install_connect_resetter( self, dry_run: bool, **kwargs, @@ -214,7 +216,7 @@ def __install_connect_resetter( :param dry_run: Whether to dry run the command :return: The output of `helm upgrade --install` """ - return self.helm.upgrade_install( + return await self.helm.upgrade_install( release_name=self._resetter_release_name, namespace=self.namespace, chart=self._resetter_helm_chart, @@ -251,13 +253,15 @@ def _get_kafka_connect_resetter_values( **self.resetter_values, } - def __uninstall_connect_resetter(self, release_name: str, dry_run: bool) -> None: + async def __uninstall_connect_resetter( + self, release_name: str, dry_run: bool + ) -> None: """Uninstall connector resetter. :param release_name: Name of the release to be uninstalled :param dry_run: Whether to do a dry run of the command """ - self.helm.uninstall( + await self.helm.uninstall( namespace=self.namespace, release_name=release_name, dry_run=dry_run, @@ -299,19 +303,19 @@ def manifest(self) -> Resource: @override async def reset(self, dry_run: bool) -> None: - self.__run_kafka_connect_resetter(dry_run) + await self.__run_kafka_connect_resetter(dry_run) @override async def clean(self, dry_run: bool) -> None: await super().clean(dry_run) - self.__run_kafka_connect_resetter(dry_run) + await self.__run_kafka_connect_resetter(dry_run) - def __run_kafka_connect_resetter(self, dry_run: bool) -> None: + async def __run_kafka_connect_resetter(self, dry_run: bool) -> None: """Run the connector resetter. :param dry_run: Whether to do a dry run of the command """ - self._run_connect_resetter( + await self._run_connect_resetter( dry_run=dry_run, retain_clean_jobs=self.config.retain_clean_jobs, offset_topic=self.offset_topic, @@ -356,14 +360,14 @@ def set_error_topic(self, topic_name: str) -> None: @override async def reset(self, dry_run: bool) -> None: - self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=False) + await self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=False) @override async def clean(self, dry_run: bool) -> None: await super().clean(dry_run) - self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=True) + await self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=True) - def __run_kafka_connect_resetter( + async def __run_kafka_connect_resetter( self, dry_run: bool, delete_consumer_group: bool ) -> None: """Run the connector resetter. @@ -371,7 +375,7 @@ def __run_kafka_connect_resetter( :param dry_run: Whether to do a dry run of the command :param delete_consumer_group: Whether the consumer group should be deleted or not """ - self._run_connect_resetter( + await self._run_connect_resetter( dry_run=dry_run, retain_clean_jobs=self.config.retain_clean_jobs, delete_consumer_group=delete_consumer_group, diff --git a/kpops/pipeline.py b/kpops/pipeline.py index bb5c6e71b..5bf8926f8 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import json import logging from collections import Counter @@ -16,7 +17,7 @@ from kpops.utils.yaml import load_yaml_file, substitute_nested if TYPE_CHECKING: - from collections.abc import Iterator + from collections.abc import Awaitable, Callable, Coroutine, Iterator from pathlib import Path from kpops.cli.registry import Registry @@ -40,7 +41,8 @@ class Pipeline(BaseModel): components: list[SerializeAsAny[PipelineComponent]] = Field( default=[], title="Components" ) - graph: nx.DiGraph = Field(default_factory=lambda: nx.DiGraph(), exclude=True) + graph: nx.DiGraph = Field(default_factory=nx.DiGraph, exclude=True) + _component_index: dict[str, PipelineComponent | None] = {} class Config: arbitrary_types_allowed = True @@ -57,6 +59,7 @@ def find(self, component_name: str) -> PipelineComponent: raise ValueError(msg) def __add_to_graph(self, component: PipelineComponent): + self._component_index[component.id] = component self.graph.add_node(component.id) for input_topic in component.inputs: @@ -83,6 +86,72 @@ def to_yaml(self) -> str: self.model_dump(mode="json", by_alias=True, exclude_none=True)["components"] ) + def build_execution_graph_from( + self, + components: list[PipelineComponent], + reverse: bool, + runner: Callable[[PipelineComponent], Coroutine], + ) -> Awaitable: + sub_graph_nodes = self.__get_graph_nodes(components) + + async def run_parallel_tasks(coroutines: list[Coroutine]) -> None: + tasks = [] + for coro in coroutines: + tasks.append(asyncio.create_task(coro)) + await asyncio.gather(*tasks) + + async def run_graph_tasks(pending_tasks: list[Awaitable]): + for pending_task in pending_tasks: + await pending_task + + sub_graph = self.graph.subgraph(sub_graph_nodes) + transformed_graph = sub_graph.copy() + + root_node = "root_node_bfs" + # We add an extra node to the graph, connecting all the leaf nodes to it + # in that way we make this node the root of the graph, avoiding backtracking + transformed_graph.add_node(root_node) + + for node in sub_graph: + predecessors = list(sub_graph.predecessors(node)) + if not predecessors: + transformed_graph.add_edge(root_node, node) + + layers_graph: list[list[str]] = list( + nx.bfs_layers(transformed_graph, root_node) + ) + + sorted_tasks = [] + for layer in layers_graph[1:]: + parallel_tasks = self.__get_parallel_tasks_from(layer, runner) + + if parallel_tasks: + sorted_tasks.append(run_parallel_tasks(parallel_tasks)) + + if reverse: + sorted_tasks.reverse() + + return run_graph_tasks(sorted_tasks) + + @staticmethod + def __get_graph_nodes(components: list[PipelineComponent]) -> Iterator[str]: + for component in components: + yield component.id + yield from component.inputs + yield from component.outputs + + def __get_parallel_tasks_from( + self, layer: list[str], runner: Callable[[PipelineComponent], Coroutine] + ) -> list[Coroutine]: + parallel_tasks = [] + + for node_in_layer in layer: + component = self._component_index[node_in_layer] + if component is not None: + parallel_tasks.append(runner(component)) + + return parallel_tasks + def __validate_graph(self) -> None: if not nx.is_directed_acyclic_graph(self.graph): msg = "Pipeline is not a valid DAG." @@ -93,12 +162,14 @@ def validate(self) -> None: self.__validate_graph() def __add_output(self, output_topic: str, source: str) -> None: + self._component_index[output_topic] = None self.graph.add_node(output_topic) self.graph.add_edge(source, output_topic) - def __add_input(self, input_topic: str, component_node_name: str) -> None: + def __add_input(self, input_topic: str, target: str) -> None: + self._component_index[input_topic] = None self.graph.add_node(input_topic) - self.graph.add_edge(input_topic, component_node_name) + self.graph.add_edge(input_topic, target) def validate_unique_names(self) -> None: step_names = [component.full_name for component in self.components] diff --git a/poetry.lock b/poetry.lock index 9b3867251..8b6bf5779 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "aiofiles" @@ -263,6 +263,20 @@ files = [ [package.extras] test = ["pytest (>=6)"] +[[package]] +name = "faker" +version = "22.0.0" +description = "Faker is a Python package that generates fake data for you." +optional = false +python-versions = ">=3.8" +files = [ + {file = "Faker-22.0.0-py3-none-any.whl", hash = "sha256:9c22c0a734ca01c6e4f2259eab5dab9081905a9d67b27272aea5c9feeb5a3789"}, + {file = "Faker-22.0.0.tar.gz", hash = "sha256:1d5dc0a75da7bc40741ee4c84d99dc087b97bd086d4222ad06ac4dd2219bcf3f"}, +] + +[package.dependencies] +python-dateutil = ">=2.4" + [[package]] name = "fastavro" version = "1.7.0" @@ -824,6 +838,30 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "polyfactory" +version = "2.13.0" +description = "Mock data generation factories" +optional = false +python-versions = "<4.0,>=3.8" +files = [ + {file = "polyfactory-2.13.0-py3-none-any.whl", hash = "sha256:03acb0718f4efb2458c62eb8a2c888294c5b5bf2db31e0efc15a57ecc9eb3c2e"}, + {file = "polyfactory-2.13.0.tar.gz", hash = "sha256:d1e6d8952789de61dca2c32f3e3c9362d7681cf405cf9a41267915e0e33f7639"}, +] + +[package.dependencies] +faker = "*" +typing-extensions = "*" + +[package.extras] +attrs = ["attrs (>=22.2.0)"] +beanie = ["beanie", "pydantic[email]"] +full = ["attrs", "beanie", "msgspec", "odmantic", "pydantic", "sqlalchemy"] +msgspec = ["msgspec"] +odmantic = ["odmantic (<1.0.0)", "pydantic[email]"] +pydantic = ["pydantic[email]"] +sqlalchemy = ["sqlalchemy (>=1.4.29)"] + [[package]] name = "pre-commit" version = "2.20.0" @@ -1906,4 +1944,4 @@ watchmedo = ["PyYAML (>=3.10)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "8b1a7a9c64802b71772782cbeca71a6510291a2e5d872213aacf823025d64a2e" +content-hash = "16666a41d3308c56b3aa59e74814dbd9898f2d780c813e9696f32ae560b3daa0" diff --git a/pyproject.toml b/pyproject.toml index 0547b52ce..790074ab8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,6 +54,7 @@ pytest-rerunfailures = "^11.1.2" pytest-asyncio = "^0.21.1" pytest-httpx = "^0.22.0" pytablewriter = { extras = ["from"], version = "^1.0.0" } +polyfactory = "^2.13.0" [tool.poetry.group.docs] optional = true diff --git a/tests/cli/test_pipeline_steps.py b/tests/cli/test_pipeline_steps.py index 3debeb4cc..fe0cfe68e 100644 --- a/tests/cli/test_pipeline_steps.py +++ b/tests/cli/test_pipeline_steps.py @@ -1,38 +1,45 @@ -from dataclasses import dataclass, field -from typing import cast from unittest.mock import MagicMock import pytest +from polyfactory.factories.pydantic_factory import ModelFactory from pytest_mock import MockerFixture from kpops.cli.main import FilterType, get_steps_to_apply +from kpops.component_handlers import ( + ComponentHandlers, +) from kpops.components import PipelineComponent +from kpops.components.base_components.models.from_section import FromSection +from kpops.components.base_components.models.to_section import ToSection from kpops.pipeline import Pipeline PREFIX = "example-prefix-" -@dataclass -class TestComponent: - __test__ = False - name: str - id: str - inputs: list[str] = field(default_factory=list) - outputs: list[str] = field(default_factory=list) - prefix: str = PREFIX +class TestComponentFactory(ModelFactory[PipelineComponent]): + to = ToSection() + from_ = FromSection() + enrich = False + validate = False + handlers = ComponentHandlers(None, MagicMock(), MagicMock()) -test_component_1 = TestComponent("example1", "example1") -test_component_2 = TestComponent("example2", "example2") -test_component_3 = TestComponent("example3", "example3") +run_validation = False +test_component_1 = TestComponentFactory.build(run_validation) +test_component_2 = TestComponentFactory.build(run_validation) +test_component_3 = TestComponentFactory.build(run_validation) + +test_component_1.name = "example1" +test_component_2.name = "example2" +test_component_3.name = "example3" @pytest.fixture(autouse=True) def pipeline() -> Pipeline: pipeline = Pipeline() - pipeline.add(cast(PipelineComponent, test_component_1)) - pipeline.add(cast(PipelineComponent, test_component_2)) - pipeline.add(cast(PipelineComponent, test_component_3)) + pipeline.add(test_component_1) + pipeline.add(test_component_2) + pipeline.add(test_component_3) return pipeline diff --git a/tests/component_handlers/helm_wrapper/test_helm_wrapper.py b/tests/component_handlers/helm_wrapper/test_helm_wrapper.py index cdc7e9d9d..6740c72a2 100644 --- a/tests/component_handlers/helm_wrapper/test_helm_wrapper.py +++ b/tests/component_handlers/helm_wrapper/test_helm_wrapper.py @@ -2,7 +2,7 @@ from pathlib import Path from textwrap import dedent from unittest import mock -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock import pytest from pytest_mock import MockerFixture @@ -36,6 +36,10 @@ def mock_execute(self, mocker: MockerFixture) -> MagicMock: mock_execute.return_value = "" return mock_execute + @pytest.fixture() + def run_command_async(self, mocker: MockerFixture) -> MagicMock: + return mocker.patch.object(Helm, "_Helm__async_execute") + @pytest.fixture() def log_warning_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.component_handlers.helm_wrapper.helm.log.warning") @@ -50,10 +54,11 @@ def mock_get_version(self, mocker: MockerFixture) -> MagicMock: def helm(self, mock_get_version: MagicMock) -> Helm: return Helm(helm_config=HelmConfig()) - def test_should_call_run_command_method_when_helm_install_with_defaults( - self, helm: Helm, mock_execute: MagicMock + @pytest.mark.asyncio() + async def test_should_call_run_command_method_when_helm_install_with_defaults( + self, helm: Helm, run_command_async: AsyncMock ): - helm.upgrade_install( + await helm.upgrade_install( release_name="test-release", chart=f"bakdata-streams-bootstrap/{AppType.STREAMS_APP.value}", dry_run=False, @@ -61,7 +66,8 @@ def test_should_call_run_command_method_when_helm_install_with_defaults( values={"commandLine": "test"}, flags=HelmUpgradeInstallFlags(), ) - mock_execute.assert_called_once_with( + + run_command_async.assert_called_once_with( [ "helm", "upgrade", @@ -134,10 +140,11 @@ def test_should_include_configured_tls_parameters_on_add_when_version_is_new( ), ] - def test_should_include_configured_tls_parameters_on_update( - self, helm: Helm, mock_execute: MagicMock + @pytest.mark.asyncio() + async def test_should_include_configured_tls_parameters_on_update( + self, helm: Helm, run_command_async: AsyncMock ): - helm.upgrade_install( + await helm.upgrade_install( release_name="test-release", chart="test-repository/test-chart", dry_run=False, @@ -149,7 +156,7 @@ def test_should_include_configured_tls_parameters_on_update( ), ) - mock_execute.assert_called_once_with( + run_command_async.assert_called_once_with( [ "helm", "upgrade", @@ -169,10 +176,11 @@ def test_should_include_configured_tls_parameters_on_update( ], ) - def test_should_call_run_command_method_when_helm_install_with_non_defaults( - self, helm: Helm, mock_execute: MagicMock + @pytest.mark.asyncio() + async def test_should_call_run_command_method_when_helm_install_with_non_defaults( + self, helm: Helm, run_command_async: AsyncMock ): - helm.upgrade_install( + await helm.upgrade_install( release_name="test-release", chart="test-repository/streams-app", namespace="test-namespace", @@ -188,7 +196,7 @@ def test_should_call_run_command_method_when_helm_install_with_non_defaults( version="2.4.2", ), ) - mock_execute.assert_called_once_with( + run_command_async.assert_called_once_with( [ "helm", "upgrade", @@ -213,26 +221,28 @@ def test_should_call_run_command_method_when_helm_install_with_non_defaults( ], ) - def test_should_call_run_command_method_when_uninstalling_streams_app( - self, helm: Helm, mock_execute: MagicMock + @pytest.mark.asyncio() + async def test_should_call_run_command_method_when_uninstalling_streams_app( + self, helm: Helm, run_command_async: AsyncMock ): - helm.uninstall( + await helm.uninstall( namespace="test-namespace", release_name="test-release", dry_run=False, ) - mock_execute.assert_called_once_with( + run_command_async.assert_called_once_with( ["helm", "uninstall", "test-release", "--namespace", "test-namespace"], ) - def test_should_log_warning_when_release_not_found( + @pytest.mark.asyncio() + async def test_should_log_warning_when_release_not_found( self, + run_command_async: AsyncMock, helm: Helm, - mock_execute: MagicMock, log_warning_mock: MagicMock, ): - mock_execute.side_effect = ReleaseNotFoundException() - helm.uninstall( + run_command_async.side_effect = ReleaseNotFoundException() + await helm.uninstall( namespace="test-namespace", release_name="test-release", dry_run=False, @@ -242,15 +252,16 @@ def test_should_log_warning_when_release_not_found( "Release with name test-release not found. Could not uninstall app." ) - def test_should_call_run_command_method_when_installing_streams_app__with_dry_run( - self, helm: Helm, mock_execute: MagicMock + @pytest.mark.asyncio() + async def test_should_call_run_command_method_when_installing_streams_app__with_dry_run( + self, helm: Helm, run_command_async: AsyncMock ): - helm.uninstall( + await helm.uninstall( namespace="test-namespace", release_name="test-release", dry_run=True, ) - mock_execute.assert_called_once_with( + run_command_async.assert_called_once_with( [ "helm", "uninstall", diff --git a/tests/component_handlers/kafka_connect/test_connect_wrapper.py b/tests/component_handlers/kafka_connect/test_connect_wrapper.py index a621d6cee..bffe9b413 100644 --- a/tests/component_handlers/kafka_connect/test_connect_wrapper.py +++ b/tests/component_handlers/kafka_connect/test_connect_wrapper.py @@ -125,7 +125,7 @@ async def create_connector_locally(): await timeout( create_connector_locally(), - secs=1, + secs=10, ) log_warning.assert_called_with( diff --git a/tests/components/test_helm_app.py b/tests/components/test_helm_app.py index 787560130..30752190f 100644 --- a/tests/components/test_helm_app.py +++ b/tests/components/test_helm_app.py @@ -37,8 +37,9 @@ def handlers(self) -> ComponentHandlers: @pytest.fixture() def helm_mock(self, mocker: MockerFixture) -> MagicMock: + async_mock = AsyncMock() return mocker.patch( - "kpops.components.base_components.helm_app.Helm" + "kpops.components.base_components.helm_app.Helm", return_value=async_mock ).return_value @pytest.fixture() diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index 290b2ef10..574566309 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -42,8 +42,10 @@ def handlers(self) -> ComponentHandlers: @pytest.fixture(autouse=True) def helm_mock(self, mocker: MockerFixture) -> MagicMock: + async_mock = AsyncMock() return mocker.patch( - "kpops.components.base_components.kafka_connector.Helm" + "kpops.components.base_components.kafka_connector.Helm", + return_value=async_mock, ).return_value @pytest.fixture() diff --git a/tests/pipeline/resources/parallel-pipeline/config.yaml b/tests/pipeline/resources/parallel-pipeline/config.yaml new file mode 100644 index 000000000..1c3b4443f --- /dev/null +++ b/tests/pipeline/resources/parallel-pipeline/config.yaml @@ -0,0 +1,15 @@ +topic_name_config: + default_error_topic_name: ${component.name}-dead-letter-topic + default_output_topic_name: ${component.name}-test-topic + +schema_registry: + enabled: true + url: "http://localhost:8081" + +kafka_connect: + url: "http://kafka_connect_url:8083" +kafka_rest: + url: "http://kafka_rest_url:8082" + +defaults_path: .. +kafka_brokers: "broker:9092" diff --git a/tests/pipeline/resources/parallel-pipeline/defaults.yaml b/tests/pipeline/resources/parallel-pipeline/defaults.yaml new file mode 100644 index 000000000..bc3398e58 --- /dev/null +++ b/tests/pipeline/resources/parallel-pipeline/defaults.yaml @@ -0,0 +1,27 @@ +pipeline-component: + prefix: "" + +kubernetes-app: + namespace: ${NAMESPACE} + +kafka-connector: + namespace: ${NAMESPACE} + +kafka-app: + app: + streams: + brokers: ${brokers} + schemaRegistryUrl: ${schema_registry_url} + +streams-app: + app: + labels: + pipeline: ${pipeline_name} + to: + topics: + ${error_topic_name}: + type: error + partitions_count: 1 + ${output_topic_name}: + type: output + partitions_count: 3 diff --git a/tests/pipeline/resources/parallel-pipeline/pipeline.yaml b/tests/pipeline/resources/parallel-pipeline/pipeline.yaml new file mode 100644 index 000000000..1c461c65d --- /dev/null +++ b/tests/pipeline/resources/parallel-pipeline/pipeline.yaml @@ -0,0 +1,64 @@ +- type: producer-app + name: transaction-avro-producer-1 + to: + topics: + my-output-topic-with-multiple-producers: + type: output + partitions_count: 3 + +- type: producer-app + name: transaction-avro-producer-2 + to: + topics: + my-output-topic-with-multiple-producers: + type: output + partitions_count: 3 + +- type: producer-app + name: transaction-avro-producer-3 + to: + topics: + my-output-topic-with-multiple-producers: + type: output + partitions_count: 3 + +- type: streams-app + name: transaction-joiner + +- type: streams-app + name: fraud-detector + +- type: streams-app + name: account-linker + from: + components: + fraud-detector: + type: input + +- type: kafka-sink-connector + name: s3-connector-1 + from: + topics: + account-linker-test-topic: + type: input + app: + connector.class: io.confluent.connect.s3.S3SinkConnector + + +- type: kafka-sink-connector + name: s3-connector-2 + from: + topics: + account-linker-test-topic: + type: input + app: + connector.class: io.confluent.connect.s3.S3SinkConnector + +- type: kafka-sink-connector + name: s3-connector-3 + from: + topics: + account-linker-test-topic: + type: input + app: + connector.class: io.confluent.connect.s3.S3SinkConnector diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 0c0073f0c..46e34b6e3 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -1,4 +1,7 @@ +import asyncio from pathlib import Path +from unittest import mock +from unittest.mock import AsyncMock import pytest import yaml @@ -7,6 +10,7 @@ import kpops from kpops.cli.main import app +from kpops.components import PipelineComponent from kpops.pipeline import ParsingException, ValidationError runner = CliRunner() @@ -668,6 +672,123 @@ def test_validate_topic_and_component_same_name(self): assert component == f"component-{topic}" assert (component, topic) in edges + @pytest.mark.asyncio() + async def test_parallel_execution_graph(self): + pipeline = kpops.generate( + RESOURCE_PATH / "parallel-pipeline/pipeline.yaml", + defaults=RESOURCE_PATH / "parallel-pipeline", + config=RESOURCE_PATH / "parallel-pipeline", + ) + + called_component = AsyncMock() + + sleep_table_components = { + "transaction-avro-producer-1": 1, + "transaction-avro-producer-2": 0, + "transaction-avro-producer-3": 2, + "transaction-joiner": 3, + "fraud-detector": 2, + "account-linker": 0, + "s3-connector-1": 2, + "s3-connector-2": 1, + "s3-connector-3": 0, + } + + async def name_runner(component: PipelineComponent): + await asyncio.sleep(sleep_table_components[component.name]) + await called_component(component.name) + + execution_graph = pipeline.build_execution_graph_from( + list(pipeline.components), False, name_runner + ) + + await execution_graph + + assert called_component.mock_calls == [ + mock.call("transaction-avro-producer-2"), + mock.call("transaction-avro-producer-1"), + mock.call("transaction-avro-producer-3"), + mock.call("transaction-joiner"), + mock.call("fraud-detector"), + mock.call("account-linker"), + mock.call("s3-connector-3"), + mock.call("s3-connector-2"), + mock.call("s3-connector-1"), + ] + + @pytest.mark.asyncio() + async def test_subgraph_execution(self): + pipeline = kpops.generate( + RESOURCE_PATH / "parallel-pipeline/pipeline.yaml", + defaults=RESOURCE_PATH / "parallel-pipeline", + config=RESOURCE_PATH / "parallel-pipeline", + ) + + list_of_components = list(pipeline.components) + + called_component = AsyncMock() + + async def name_runner(component: PipelineComponent): + await called_component(component.name) + + execution_graph = pipeline.build_execution_graph_from( + [list_of_components[0], list_of_components[3], list_of_components[6]], + False, + name_runner, + ) + + await execution_graph + + assert called_component.mock_calls == [ + mock.call("transaction-avro-producer-1"), + mock.call("s3-connector-1"), + mock.call("transaction-joiner"), + ] + + @pytest.mark.asyncio() + async def test_parallel_execution_graph_reverse(self): + pipeline = kpops.generate( + RESOURCE_PATH / "parallel-pipeline/pipeline.yaml", + defaults=RESOURCE_PATH / "parallel-pipeline", + config=RESOURCE_PATH / "parallel-pipeline", + ) + + called_component = AsyncMock() + + sleep_table_components = { + "transaction-avro-producer-1": 1, + "transaction-avro-producer-2": 0, + "transaction-avro-producer-3": 2, + "transaction-joiner": 3, + "fraud-detector": 2, + "account-linker": 0, + "s3-connector-1": 2, + "s3-connector-2": 1, + "s3-connector-3": 0, + } + + async def name_runner(component: PipelineComponent): + await asyncio.sleep(sleep_table_components[component.name]) + await called_component(component.name) + + execution_graph = pipeline.build_execution_graph_from( + list(pipeline.components), True, name_runner + ) + + await execution_graph + + assert called_component.mock_calls == [ + mock.call("s3-connector-3"), + mock.call("s3-connector-2"), + mock.call("s3-connector-1"), + mock.call("account-linker"), + mock.call("fraud-detector"), + mock.call("transaction-joiner"), + mock.call("transaction-avro-producer-2"), + mock.call("transaction-avro-producer-1"), + mock.call("transaction-avro-producer-3"), + ] + def test_temp_trim_release_name(self): result = runner.invoke( app,