Skip to content

Commit

Permalink
Add support for pipeline steps parallelization (#312)
Browse files Browse the repository at this point in the history
closes #177

---------

Co-authored-by: Ivan Yordanov <[email protected]>
  • Loading branch information
irux and sujuka99 authored Jan 29, 2024
1 parent ad11fde commit 86f8db9
Show file tree
Hide file tree
Showing 46 changed files with 1,564 additions and 551 deletions.
4 changes: 4 additions & 0 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ $ kpops clean [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops deploy`
Expand All @@ -73,6 +74,7 @@ $ kpops deploy [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops destroy`
Expand All @@ -99,6 +101,7 @@ $ kpops destroy [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops generate`
Expand Down Expand Up @@ -175,6 +178,7 @@ $ kpops reset [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops schema`
Expand Down
180 changes: 132 additions & 48 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import asyncio
import logging
from collections.abc import Iterator
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Optional
Expand Down Expand Up @@ -32,6 +32,8 @@
from kpops.utils.yaml import print_yaml

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Coroutine, Iterator

from kpops.components.base_components import PipelineComponent


Expand Down Expand Up @@ -92,6 +94,13 @@
help="Whether to dry run the command or execute it",
)

PARALLEL: bool = typer.Option(
False,
"--parallel/--no-parallel",
rich_help_panel="EXPERIMENTAL: features in preview, not production-ready",
help="Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially.",
)


class FilterType(str, Enum):
INCLUDE = "include"
Expand Down Expand Up @@ -183,6 +192,26 @@ def is_in_steps(component: PipelineComponent) -> bool:
return filtered_steps


def get_reverse_concurrently_tasks_to_execute(
pipeline: Pipeline,
steps: str | None,
filter_type: FilterType,
runner: Callable[[PipelineComponent], Coroutine],
) -> Awaitable:
steps_to_apply = reverse_pipeline_steps(pipeline, steps, filter_type)
return pipeline.build_execution_graph_from(list(steps_to_apply), True, runner)


def get_concurrently_tasks_to_execute(
pipeline: Pipeline,
steps: str | None,
filter_type: FilterType,
runner: Callable[[PipelineComponent], Coroutine],
) -> Awaitable:
steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
return pipeline.build_execution_graph_from(steps_to_apply, False, runner)


def get_steps_to_apply(
pipeline: Pipeline, steps: str | None, filter_type: FilterType
) -> list[PipelineComponent]:
Expand Down Expand Up @@ -283,6 +312,7 @@ def generate(
environment,
verbose,
)

pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
if output:
print_yaml(pipeline.to_yaml())
Expand Down Expand Up @@ -335,20 +365,33 @@ def deploy(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)

steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
for component in steps_to_apply:
async def deploy_runner(component: PipelineComponent):
log_action("Deploy", component)
component.deploy(dry_run)
await component.deploy(dry_run)

async def async_deploy():
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)

if parallel:
pipeline_tasks = get_concurrently_tasks_to_execute(
pipeline, steps, filter_type, deploy_runner
)
await pipeline_tasks
else:
steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
for component in steps_to_apply:
await deploy_runner(component)

asyncio.run(async_deploy())


@app.command(help="Destroy pipeline steps") # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8
Expand All @@ -362,19 +405,34 @@ def destroy(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
async def destroy_runner(component: PipelineComponent):
log_action("Destroy", component)
component.destroy(dry_run)
await component.destroy(dry_run)

async def async_destroy():
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)

pipeline = setup_pipeline(pipeline_path, kpops_config, environment)

if parallel:
pipeline_tasks = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, destroy_runner
)
await pipeline_tasks
else:
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
await destroy_runner(component)

asyncio.run(async_destroy())


@app.command(help="Reset pipeline steps") # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8
Expand All @@ -388,20 +446,33 @@ def reset(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
async def reset_runner(component: PipelineComponent):
log_action("Reset", component)
component.destroy(dry_run)
component.reset(dry_run)
await component.destroy(dry_run)
await component.reset(dry_run)

async def async_reset():
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
if parallel:
pipeline_tasks = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, reset_runner
)
await pipeline_tasks
else:
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
await reset_runner(component)

asyncio.run(async_reset())


@app.command(help="Clean pipeline steps") # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8
Expand All @@ -415,20 +486,33 @@ def clean(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
async def clean_runner(component: PipelineComponent):
log_action("Clean", component)
component.destroy(dry_run)
component.clean(dry_run)
await component.destroy(dry_run)
await component.clean(dry_run)

async def async_clean():
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
if parallel:
pipeline_steps = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, clean_runner
)
await pipeline_steps
else:
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
await clean_runner(component)

asyncio.run(async_clean())


def version_callback(show_version: bool) -> None:
Expand Down
23 changes: 19 additions & 4 deletions kpops/component_handlers/helm_wrapper/helm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import logging
import re
import subprocess
Expand Down Expand Up @@ -74,7 +75,7 @@ def add_repo(
else:
self.__execute(["helm", "repo", "update"])

def upgrade_install(
async def upgrade_install(
self,
release_name: str,
chart: str,
Expand Down Expand Up @@ -103,9 +104,9 @@ def upgrade_install(
command.extend(flags.to_command())
if dry_run:
command.append("--dry-run")
return self.__execute(command)
return await self.__async_execute(command)

def uninstall(
async def uninstall(
self,
namespace: str,
release_name: str,
Expand All @@ -122,7 +123,7 @@ def uninstall(
if dry_run:
command.append("--dry-run")
try:
return self.__execute(command)
return await self.__async_execute(command)
except ReleaseNotFoundException:
log.warning(
f"Release with name {release_name} not found. Could not uninstall app."
Expand Down Expand Up @@ -229,6 +230,20 @@ def __execute(self, command: list[str]) -> str:
log.debug(process.stdout)
return process.stdout

async def __async_execute(self, command: list[str]):
command = self.__set_global_flags(command)
log.debug(f"Executing {' '.join(command)}")
proc = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

stdout, stderr = await proc.communicate()
Helm.parse_helm_command_stderr_output(stderr.decode())
log.debug(stdout)
return stdout.decode()

def __set_global_flags(self, command: list[str]) -> list[str]:
if self._context:
log.debug(f"Changing the Kubernetes context to {self._context}")
Expand Down
Loading

0 comments on commit 86f8db9

Please sign in to comment.