Skip to content

Commit

Permalink
Move all cli to api
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed May 21, 2024
1 parent d3d66fc commit d1d102e
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 105 deletions.
3 changes: 1 addition & 2 deletions kpops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
__version__ = "5.0.1"

# export public API functions
from kpops.api import generate, manifest
from kpops.cli.main import clean, deploy, destroy, init, reset
from kpops.api import clean, deploy, destroy, generate, init, manifest, reset

__all__ = (
"generate",
Expand Down
181 changes: 180 additions & 1 deletion kpops/api.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,39 @@
from __future__ import annotations

import asyncio
import logging
from pathlib import Path
from typing import TYPE_CHECKING

import kpops
from kpops.cli.custom_formatter import CustomFormatter
from kpops.cli.options import FilterType
from kpops.config import KpopsConfig
from kpops.pipeline import (
Pipeline,
)
from kpops.utils.cli_commands import init_project

if TYPE_CHECKING:
from kpops.components import PipelineComponent
from kpops.components.base_components.models.resource import Resource

log = logging.getLogger("KPOpsAPI")
logger = logging.getLogger()
logging.getLogger("httpx").setLevel(logging.WARNING)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(CustomFormatter())
logger.addHandler(stream_handler)

log = logging.getLogger("")
LOG_DIVIDER = "#" * 100


def log_action(action: str, pipeline_component: PipelineComponent):
log.info("\n")
log.info(LOG_DIVIDER)
log.info(f"{action} {pipeline_component.name}")
log.info(LOG_DIVIDER)
log.info("\n")


def parse_steps(steps: str) -> set[str]:
Expand Down Expand Up @@ -75,3 +94,163 @@ def manifest(
resource = component.manifest()
resources.append(resource)
return resources


def deploy(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: str | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
dry_run: bool = True,
verbose: bool = True,
parallel: bool = False,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)

async def deploy_runner(component: PipelineComponent):
log_action("Deploy", component)
await component.deploy(dry_run)

async def async_deploy():
if parallel:
pipeline_tasks = pipeline.build_execution_graph(deploy_runner)
await pipeline_tasks
else:
for component in pipeline.components:
await deploy_runner(component)

asyncio.run(async_deploy())


def destroy(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: str | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
dry_run: bool = True,
verbose: bool = True,
parallel: bool = False,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)

async def destroy_runner(component: PipelineComponent):
log_action("Destroy", component)
await component.destroy(dry_run)

async def async_destroy():
if parallel:
pipeline_tasks = pipeline.build_execution_graph(
destroy_runner, reverse=True
)
await pipeline_tasks
else:
for component in reversed(pipeline.components):
await destroy_runner(component)

asyncio.run(async_destroy())


def reset(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: str | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
dry_run: bool = True,
verbose: bool = True,
parallel: bool = False,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)

async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Reset", component)
await component.reset(dry_run)

async def async_reset():
if parallel:
pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True)
await pipeline_tasks
else:
for component in reversed(pipeline.components):
await reset_runner(component)

asyncio.run(async_reset())


def clean(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: str | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
dry_run: bool = True,
verbose: bool = True,
parallel: bool = False,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)

async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Clean", component)
await component.clean(dry_run)

async def async_clean():
if parallel:
pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True)
await pipeline_tasks
else:
for component in reversed(pipeline.components):
await clean_runner(component)

asyncio.run(async_clean())


def init(
path: Path,
config_include_opt: bool = False,
):
if not path.exists():
path.mkdir(parents=False)
elif next(path.iterdir(), False):
log.warning("Please provide a path to an empty directory.")
return
init_project(path, config_include_opt)
Loading

0 comments on commit d1d102e

Please sign in to comment.