Skip to content

Commit

Permalink
Add paralellizable tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
irux committed Oct 16, 2023
1 parent ddeedb1 commit f9bb04e
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 44 deletions.
70 changes: 51 additions & 19 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import logging
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Awaitable, Coroutine

Check failure on line 7 in kpops/cli/main.py

View workflow job for this annotation

GitHub Actions / Test (ubuntu-22.04, 3.10)

[*] Import from `collections.abc` instead: `Awaitable`, `Coroutine`
from collections.abc import Callable

Check failure on line 8 in kpops/cli/main.py

View workflow job for this annotation

GitHub Actions / Test (ubuntu-22.04, 3.10)

[*] Move standard library import `collections.abc.Callable` into a type-checking block

import dtyper
import typer
Expand Down Expand Up @@ -172,6 +173,26 @@ def is_in_steps(component: PipelineComponent) -> bool:
return filtered_steps


def get_reverse_concurrently_tasks_to_execute(
pipeline: Pipeline,
steps: str | None,
filter_type: FilterType,
runner: Callable[[PipelineComponent], Coroutine],
) -> Awaitable:
steps_to_apply = reverse_pipeline_steps(pipeline, steps, filter_type)
return pipeline.build_execution_graph_from(list(steps_to_apply), True, runner)


def get_concurrently_tasks_to_execute(
pipeline: Pipeline,
steps: str | None,
filter_type: FilterType,
runner: Callable[[PipelineComponent], Coroutine],
) -> Awaitable:
steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
return pipeline.build_execution_graph_from(steps_to_apply, False, runner)


def get_steps_to_apply(
pipeline: Pipeline, steps: str | None, filter_type: FilterType
) -> list[PipelineComponent]:
Expand Down Expand Up @@ -285,16 +306,19 @@ def deploy(
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
):
async def deploy_runner(component: PipelineComponent):
await component.deploy(dry_run)

async def async_deploy():
pipeline_config = create_pipeline_config(config, defaults, verbose)
pipeline = setup_pipeline(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)

steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type)
for component in steps_to_apply:
log_action("Deploy", component)
await component.deploy(dry_run)
pipeline_tasks = get_concurrently_tasks_to_execute(
pipeline, steps, filter_type, deploy_runner
)
await pipeline_tasks

asyncio.run(async_deploy())

Expand All @@ -313,15 +337,18 @@ def destroy(
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
):
async def destroy_runner(component: PipelineComponent):
await component.destroy(dry_run)

async def async_destroy():
pipeline_config = create_pipeline_config(config, defaults, verbose)
pipeline = setup_pipeline(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
log_action("Destroy", component)
await component.destroy(dry_run)
pipeline_tasks = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, destroy_runner
)
await pipeline_tasks

asyncio.run(async_destroy())

Expand All @@ -340,16 +367,19 @@ def reset(
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
):
async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
await component.reset(dry_run)

async def async_reset():
pipeline_config = create_pipeline_config(config, defaults, verbose)
pipeline = setup_pipeline(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
log_action("Reset", component)
await component.destroy(dry_run)
await component.reset(dry_run)
pipeline_tasks = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, reset_runner
)
await pipeline_tasks

asyncio.run(async_reset())

Expand All @@ -368,16 +398,18 @@ def clean(
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
):
async def clean_runner(component: PipelineComponent):
await component.clean(dry_run)

async def async_clean():
pipeline_config = create_pipeline_config(config, defaults, verbose)
pipeline = setup_pipeline(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type)
for component in pipeline_steps:
log_action("Clean", component)
await component.destroy(dry_run)
await component.clean(dry_run)
pipeline_steps = get_reverse_concurrently_tasks_to_execute(
pipeline, steps, filter_type, clean_runner
)
await pipeline_steps

asyncio.run(async_clean())

Expand Down
35 changes: 28 additions & 7 deletions kpops/pipeline_generator/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from collections import Counter
from contextlib import suppress
from typing import TYPE_CHECKING, Optional, Awaitable
from typing import TYPE_CHECKING, Optional, Awaitable, Coroutine

Check failure on line 8 in kpops/pipeline_generator/pipeline.py

View workflow job for this annotation

GitHub Actions / Test (ubuntu-22.04, 3.10)

[*] Import from `collections.abc` instead: `Awaitable`, `Coroutine`

import networkx as nx
import yaml
Expand All @@ -19,7 +19,7 @@
from kpops.utils.yaml_loading import load_yaml_file, substitute, substitute_nested

if TYPE_CHECKING:
from collections.abc import Iterator
from collections.abc import Iterator, Callable

Check failure on line 22 in kpops/pipeline_generator/pipeline.py

View workflow job for this annotation

GitHub Actions / Test (ubuntu-22.04, 3.10)

[*] Import block is un-sorted or un-formatted
from pathlib import Path

from kpops.cli.pipeline_config import PipelineConfig
Expand Down Expand Up @@ -80,12 +80,23 @@ def __iter__(self) -> Iterator[PipelineComponent]:
def __len__(self) -> int:
return len(self.components)

def build_deploy_graph_task(self, dry_run: bool):
async def run_graph_tasks(tasks: list[Awaitable]):
for pending_task in tasks:
def build_execution_graph_from(
self,
components: list[PipelineComponent],
reverse: bool,
runner: Callable[[PipelineComponent], Coroutine],
):
async def run_graph_tasks(pending_tasks: list[Awaitable]):
for pending_task in pending_tasks:
await pending_task

transformed_graph = self.graph.copy()
nodes = [node_component.id for node_component in components]

transformed_graph = self.graph.subgraph(nodes)

if reverse:
transformed_graph = transformed_graph.reverse()

root_node = "root_node_bfs"
transformed_graph.add_node(root_node)

Expand All @@ -104,7 +115,7 @@ async def run_graph_tasks(tasks: list[Awaitable]):
node = self._component_index[task]
if not node.is_topic:
parallel_tasks.append(
asyncio.create_task(node.component.deploy(dry_run))
asyncio.create_task(runner(node.component))
)

if parallel_tasks:
Expand Down Expand Up @@ -238,6 +249,16 @@ def load_from_yaml(

return cls(main_content, env_content, registry, config, handlers)

async def build_execution_graph_from(
self,
components: list[PipelineComponent],
reverse: bool,
runner: Callable[[PipelineComponent], Coroutine],
) -> Awaitable:
return self.components.build_execution_graph_from(
components, reverse, runner
)

def parse_components(self, component_list: list[dict]) -> None:
"""Instantiate, enrich and inflate a list of components.
Expand Down
18 changes: 0 additions & 18 deletions test.py

This file was deleted.

0 comments on commit f9bb04e

Please sign in to comment.