Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate KPOps API from the CLI #489

Merged
merged 13 commits into from
May 23, 2024
2 changes: 0 additions & 2 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ $ kpops generate [OPTIONS] PIPELINE_PATH

* `--dotenv FILE`: Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. [env var: KPOPS_DOTENV_PATH]
* `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .]
* `--output / --no-output`: Enable output printing [default: output]
* `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS]
* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE]
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
Expand Down Expand Up @@ -164,7 +163,6 @@ $ kpops manifest [OPTIONS] PIPELINE_PATH

* `--dotenv FILE`: Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. [env var: KPOPS_DOTENV_PATH]
* `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .]
* `--output / --no-output`: Enable output printing [default: output]
* `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS]
* `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE]
* `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT]
Expand Down
2 changes: 1 addition & 1 deletion hooks/gen_docs/gen_docs_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import yaml

from hooks import ROOT
from kpops.cli.registry import _find_classes
from kpops.api.registry import _find_classes
from kpops.components import KafkaConnector, PipelineComponent
from kpops.utils.colorify import redify, yellowify
from kpops.utils.pydantic import issubclass_patched
Expand Down
2 changes: 1 addition & 1 deletion kpops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__version__ = "5.1.1"

# export public API functions
from kpops.cli.main import clean, deploy, destroy, generate, init, manifest, reset
from kpops.api import clean, deploy, destroy, generate, init, manifest, reset

__all__ = (
"generate",
Expand Down
268 changes: 268 additions & 0 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
from __future__ import annotations

import asyncio
from pathlib import Path
from typing import TYPE_CHECKING

import kpops
from kpops.api.logs import log, log_action
from kpops.api.options import FilterType
from kpops.api.registry import Registry
from kpops.component_handlers import ComponentHandlers
from kpops.component_handlers.kafka_connect.kafka_connect_handler import (
KafkaConnectHandler,
)
from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler
from kpops.component_handlers.topic.handler import TopicHandler
from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper
from kpops.config import KpopsConfig
from kpops.pipeline import (
Pipeline,
PipelineGenerator,
)
from kpops.utils.cli_commands import init_project

if TYPE_CHECKING:
from kpops.components import PipelineComponent
from kpops.components.base_components.models.resource import Resource
from kpops.config import KpopsConfig


def generate(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: set[str] | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
verbose: bool = False,
) -> Pipeline:
kpops_config = KpopsConfig.create(
config,
dotenv,
environment,
verbose,
)
pipeline = create_pipeline(pipeline_path, kpops_config, environment)
log.info(f"Picked up pipeline '{pipeline_path.parent.name}'")
if steps:
component_names = steps
log.debug(
f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}"
)

predicate = filter_type.create_default_step_names_filter_predicate(
component_names
)
pipeline.filter(predicate)
log.info(f"Filtered pipeline:\n{pipeline.step_names}")
return pipeline


def manifest(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: set[str] | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
verbose: bool = False,
) -> list[Resource]:
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
resources: list[Resource] = []
for component in pipeline.components:
resource = component.manifest()
resources.append(resource)
return resources


def deploy(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: set[str] | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
dry_run: bool = True,
verbose: bool = True,
parallel: bool = False,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)

async def deploy_runner(component: PipelineComponent):
log_action("Deploy", component)
await component.deploy(dry_run)

async def async_deploy():
if parallel:
pipeline_tasks = pipeline.build_execution_graph(deploy_runner)
await pipeline_tasks
else:
for component in pipeline.components:
await deploy_runner(component)

asyncio.run(async_deploy())


def destroy(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: set[str] | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
dry_run: bool = True,
verbose: bool = True,
parallel: bool = False,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)

async def destroy_runner(component: PipelineComponent):
log_action("Destroy", component)
await component.destroy(dry_run)

async def async_destroy():
if parallel:
pipeline_tasks = pipeline.build_execution_graph(
destroy_runner, reverse=True
)
await pipeline_tasks
else:
for component in reversed(pipeline.components):
await destroy_runner(component)

asyncio.run(async_destroy())


def reset(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: set[str] | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
dry_run: bool = True,
verbose: bool = True,
parallel: bool = False,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)

async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Reset", component)
await component.reset(dry_run)

async def async_reset():
if parallel:
pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True)
await pipeline_tasks
else:
for component in reversed(pipeline.components):
await reset_runner(component)

asyncio.run(async_reset())


def clean(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: set[str] | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
dry_run: bool = True,
verbose: bool = True,
parallel: bool = False,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
)

async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Clean", component)
await component.clean(dry_run)

async def async_clean():
if parallel:
pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True)
await pipeline_tasks
else:
for component in reversed(pipeline.components):
await clean_runner(component)

asyncio.run(async_clean())


def init(
path: Path,
config_include_opt: bool = False,
):
if not path.exists():
path.mkdir(parents=False)
elif next(path.iterdir(), False):
log.warning("Please provide a path to an empty directory.")
return
init_project(path, config_include_opt)


def create_pipeline(
pipeline_path: Path,
kpops_config: KpopsConfig,
environment: str | None,
) -> Pipeline:
registry = Registry()
if kpops_config.components_module:
registry.find_components(kpops_config.components_module)
registry.find_components("kpops.components")

handlers = setup_handlers(kpops_config)
parser = PipelineGenerator(kpops_config, registry, handlers)
return parser.load_yaml(pipeline_path, environment)


def setup_handlers(config: KpopsConfig) -> ComponentHandlers:
schema_handler = SchemaHandler.load_schema_handler(config)
connector_handler = KafkaConnectHandler.from_kpops_config(config)
proxy_wrapper = ProxyWrapper(config.kafka_rest)
topic_handler = TopicHandler(proxy_wrapper)

return ComponentHandlers(schema_handler, connector_handler, topic_handler)
13 changes: 13 additions & 0 deletions kpops/api/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from __future__ import annotations


class ValidationError(Exception):
pass


class ParsingException(Exception):
pass


class ClassNotFoundError(Exception):
"""Similar to builtin `ModuleNotFoundError`; class doesn't exist inside module."""
24 changes: 24 additions & 0 deletions kpops/cli/custom_formatter.py → kpops/api/logs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

import typer

if TYPE_CHECKING:
from kpops.components import PipelineComponent


class CustomFormatter(logging.Formatter):
def format(self, record):
Expand All @@ -23,3 +29,21 @@ def format(self, record):
log_fmt = formats.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)


logger = logging.getLogger()
logging.getLogger("httpx").setLevel(logging.WARNING)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(CustomFormatter())
logger.addHandler(stream_handler)

log = logging.getLogger("")
LOG_DIVIDER = "#" * 100


def log_action(action: str, pipeline_component: PipelineComponent):
log.info("\n")
log.info(LOG_DIVIDER)
log.info(f"{action} {pipeline_component.name}")
log.info(LOG_DIVIDER)
log.info("\n")
29 changes: 29 additions & 0 deletions kpops/api/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from kpops.components import PipelineComponent
from kpops.pipeline import ComponentFilterPredicate


class FilterType(str, Enum):
INCLUDE = "include"
EXCLUDE = "exclude"

@staticmethod
def is_in_steps(component: PipelineComponent, component_names: set[str]) -> bool:
return component.name in component_names

def create_default_step_names_filter_predicate(
self, component_names: set[str]
) -> ComponentFilterPredicate:
def predicate(component: PipelineComponent) -> bool:
match self, FilterType.is_in_steps(component, component_names):
case (FilterType.INCLUDE, False) | (FilterType.EXCLUDE, True):
return False
case _:
return True

return predicate
2 changes: 1 addition & 1 deletion kpops/cli/registry.py → kpops/api/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import TYPE_CHECKING, TypeVar

from kpops import __name__
from kpops.cli.exception import ClassNotFoundError
from kpops.api.exception import ClassNotFoundError
from kpops.components.base_components.pipeline_component import PipelineComponent

if TYPE_CHECKING:
Expand Down
2 changes: 0 additions & 2 deletions kpops/cli/exception.py

This file was deleted.

Loading
Loading