Skip to content

Commit

Permalink
Execute operations in parallel (#372)
Browse files Browse the repository at this point in the history
Closes #285

---------

Co-authored-by: Ivan Yordanov <[email protected]>
  • Loading branch information
irux and sujuka99 authored Jan 15, 2024
1 parent deb384c commit bcd3865
Show file tree
Hide file tree
Showing 17 changed files with 572 additions and 124 deletions.
4 changes: 4 additions & 0 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ $ kpops clean [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Run the command in parallel [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops deploy`
Expand All @@ -73,6 +74,7 @@ $ kpops deploy [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Run the command in parallel [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops destroy`
Expand All @@ -99,6 +101,7 @@ $ kpops destroy [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Run the command in parallel [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops generate`
Expand Down Expand Up @@ -175,6 +178,7 @@ $ kpops reset [OPTIONS] PIPELINE_PATH
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Run the command in parallel [default: no-parallel]
* `--help`: Show this message and exit.

## `kpops schema`
Expand Down
171 changes: 119 additions & 52 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import logging
from collections.abc import Iterator
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Optional
Expand Down Expand Up @@ -33,6 +32,8 @@
from kpops.utils.yaml import print_yaml

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Coroutine, Iterator

from kpops.components.base_components import PipelineComponent


Expand Down Expand Up @@ -94,6 +95,13 @@
)


PARALLEL: bool = typer.Option(
False,
"--parallel/--no-parallel",
help="Run the command in parallel",
)


class FilterType(str, Enum):
INCLUDE = "include"
EXCLUDE = "exclude"
Expand Down Expand Up @@ -184,6 +192,26 @@ def is_in_steps(component: PipelineComponent) -> bool:
return filtered_steps


def get_reverse_concurrently_tasks_to_execute(
pipeline: Pipeline,
steps: str | None,
filter_type: FilterType,
runner: Callable[[PipelineComponent], Coroutine],
) -> Awaitable:
steps_to_apply = reverse_pipeline_steps(pipeline, steps, filter_type)
return pipeline.build_execution_graph_from(list(steps_to_apply), True, runner)


def get_concurrently_tasks_to_execute(
pipeline: Pipeline,
steps: str | None,
filter_type: FilterType,
runner: Callable[[PipelineComponent], Coroutine],
) -> Awaitable:
steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
return pipeline.build_execution_graph_from(steps_to_apply, False, runner)


def get_steps_to_apply(
pipeline: Pipeline, steps: str | None, filter_type: FilterType
) -> list[PipelineComponent]:
Expand Down Expand Up @@ -284,6 +312,7 @@ def generate(
environment,
verbose,
)

pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
if output:
print_yaml(pipeline.to_yaml())
Expand Down Expand Up @@ -336,22 +365,31 @@ def deploy(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)

steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
async def deploy_runner(component: PipelineComponent):
log_action("Deploy", component)
await component.deploy(dry_run)

async def async_deploy():
for component in steps_to_apply:
log_action("Deploy", component)
await component.deploy(dry_run)
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)

if parallel:
pipeline_tasks = get_concurrently_tasks_to_execute(
pipeline, steps, filter_type, deploy_runner
)
await pipeline_tasks
else:
steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
for component in steps_to_apply:
await deploy_runner(component)

asyncio.run(async_deploy())

Expand All @@ -367,21 +405,32 @@ def destroy(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
async def destroy_runner(component: PipelineComponent):
log_action("Destroy", component)
await component.destroy(dry_run)

async def async_destroy():
for component in pipeline_steps:
log_action("Destroy", component)
await component.destroy(dry_run)
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)

pipeline = setup_pipeline(pipeline_path, kpops_config, environment)

if parallel:
pipeline_tasks = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, destroy_runner
)
await pipeline_tasks
else:
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
await destroy_runner(component)

asyncio.run(async_destroy())

Expand All @@ -397,22 +446,31 @@ def reset(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
async def reset_runner(component: PipelineComponent):
log_action("Reset", component)
await component.destroy(dry_run)
await component.reset(dry_run)

async def async_reset():
for component in pipeline_steps:
log_action("Reset", component)
await component.destroy(dry_run)
await component.reset(dry_run)
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
if parallel:
pipeline_tasks = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, reset_runner
)
await pipeline_tasks
else:
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
await reset_runner(component)

asyncio.run(async_reset())

Expand All @@ -428,22 +486,31 @@ def clean(
environment: Optional[str] = ENVIRONMENT,
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
async def clean_runner(component: PipelineComponent):
log_action("Clean", component)
await component.destroy(dry_run)
await component.clean(dry_run)

async def async_clean():
for component in pipeline_steps:
log_action("Clean", component)
await component.destroy(dry_run)
await component.clean(dry_run)
kpops_config = create_kpops_config(
config,
defaults,
dotenv,
environment,
verbose,
)
pipeline = setup_pipeline(pipeline_path, kpops_config, environment)
if parallel:
pipeline_steps = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, clean_runner
)
await pipeline_steps
else:
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
await clean_runner(component)

asyncio.run(async_clean())

Expand Down
23 changes: 19 additions & 4 deletions kpops/component_handlers/helm_wrapper/helm.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import logging
import re
import subprocess
Expand Down Expand Up @@ -74,7 +75,7 @@ def add_repo(
else:
self.__execute(["helm", "repo", "update"])

def upgrade_install(
async def upgrade_install(
self,
release_name: str,
chart: str,
Expand Down Expand Up @@ -103,9 +104,9 @@ def upgrade_install(
command.extend(flags.to_command())
if dry_run:
command.append("--dry-run")
return self.__execute(command)
return await self.__async_execute(command)

def uninstall(
async def uninstall(
self,
namespace: str,
release_name: str,
Expand All @@ -122,7 +123,7 @@ def uninstall(
if dry_run:
command.append("--dry-run")
try:
return self.__execute(command)
return await self.__async_execute(command)
except ReleaseNotFoundException:
log.warning(
f"Release with name {release_name} not found. Could not uninstall app."
Expand Down Expand Up @@ -229,6 +230,20 @@ def __execute(self, command: list[str]) -> str:
log.debug(process.stdout)
return process.stdout

async def __async_execute(self, command: list[str]):
command = self.__set_global_flags(command)
log.debug(f"Executing {' '.join(command)}")
proc = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)

stdout, stderr = await proc.communicate()
Helm.parse_helm_command_stderr_output(stderr.decode())
log.debug(stdout)
return stdout.decode()

def __set_global_flags(self, command: list[str]) -> list[str]:
if self._context:
log.debug(f"Changing the Kubernetes context to {self._context}")
Expand Down
4 changes: 2 additions & 2 deletions kpops/components/base_components/helm_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def deploy_flags(self) -> HelmUpgradeInstallFlags:

@override
async def deploy(self, dry_run: bool) -> None:
stdout = self.helm.upgrade_install(
stdout = await self.helm.upgrade_install(
self.helm_release_name,
self.helm_chart,
dry_run,
Expand All @@ -165,7 +165,7 @@ async def deploy(self, dry_run: bool) -> None:

@override
async def destroy(self, dry_run: bool) -> None:
stdout = self.helm.uninstall(
stdout = await self.helm.uninstall(
self.namespace,
self.helm_release_name,
dry_run,
Expand Down
Loading

0 comments on commit bcd3865

Please sign in to comment.