From 8c72d7e9f103ecc1c5312769b23ad97719abd9cd Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 21 May 2024 14:57:19 +0200 Subject: [PATCH 01/10] Separate KPOps API from the CLI --- docs/docs/user/references/cli-commands.md | 1 - kpops/__init__.py | 3 +- kpops/api.py | 47 +++++++ kpops/cli/main.py | 121 +++--------------- kpops/cli/options.py | 23 ++++ .../streams_bootstrap/streams/model.py | 2 +- kpops/config.py | 20 +++ kpops/exception.py | 9 ++ kpops/pipeline.py | 50 ++++++-- tests/cli/test_handlers.py | 20 +-- tests/components/test_streams_app.py | 2 +- tests/pipeline/test_generate.py | 13 +- tests/pipeline/test_pipeline.py | 17 +-- 13 files changed, 179 insertions(+), 149 deletions(-) create mode 100644 kpops/api.py create mode 100644 kpops/exception.py diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 3c46cb11b..7225c89ad 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -120,7 +120,6 @@ $ kpops generate [OPTIONS] PIPELINE_PATH * `--dotenv FILE`: Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. [env var: KPOPS_DOTENV_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] -* `--output / --no-output`: Enable output printing [default: output] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] * `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] diff --git a/kpops/__init__.py b/kpops/__init__.py index 34ff74a89..474b65c61 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,7 +1,8 @@ __version__ = "5.0.1" # export public API functions -from kpops.cli.main import clean, deploy, destroy, generate, init, manifest, reset +from kpops.api import generate +from kpops.cli.main import clean, deploy, destroy, init, manifest, reset __all__ = ( "generate", diff --git a/kpops/api.py b/kpops/api.py new file mode 100644 index 000000000..7c8190699 --- /dev/null +++ b/kpops/api.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import logging +from pathlib import Path + +from kpops.cli.options import FilterType +from kpops.config import KpopsConfig +from kpops.pipeline import ( + Pipeline, +) + +log = logging.getLogger("KPOpsAPI") + + +def parse_steps(steps: str) -> set[str]: + return set(steps.split(",")) + + +def generate( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + verbose: bool = False, +) -> Pipeline: + kpops_config = KpopsConfig.create( + config, + dotenv, + environment, + verbose, + ) + pipeline = Pipeline.create(pipeline_path, kpops_config, environment) + log.info(f"Picked up pipeline '{pipeline_path.parent.name}'") + if steps: + component_names = parse_steps(steps) + log.debug( + f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" + ) + + predicate = filter_type.create_default_step_names_filter_predicate( + component_names + ) + pipeline.filter(predicate) + log.info(f"Filtered pipeline:\n{pipeline.step_names}") + return pipeline diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 6bdcd8dc4..911071bcd 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -8,20 +8,12 @@ import dtyper import typer +import kpops from kpops import __version__ from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType -from kpops.cli.registry import Registry -from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( - KafkaConnectHandler, -) -from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler -from kpops.component_handlers.topic.handler import TopicHandler -from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.components.base_components.models.resource import Resource from kpops.config import ENV_PREFIX, KpopsConfig -from kpops.pipeline import ComponentFilterPredicate, Pipeline, PipelineGenerator from kpops.utils.cli_commands import init_project from kpops.utils.gen_schema import ( SchemaScope, @@ -29,7 +21,6 @@ gen_defaults_schema, gen_pipeline_schema, ) -from kpops.utils.pydantic import YamlConfigSettingsSource from kpops.utils.yaml import print_yaml if TYPE_CHECKING: @@ -135,55 +126,10 @@ log = logging.getLogger("") -def setup_pipeline( - pipeline_path: Path, - kpops_config: KpopsConfig, - environment: str | None, -) -> Pipeline: - registry = Registry() - if kpops_config.components_module: - registry.find_components(kpops_config.components_module) - registry.find_components("kpops.components") - - handlers = setup_handlers(kpops_config) - parser = PipelineGenerator(kpops_config, registry, handlers) - return parser.load_yaml(pipeline_path, environment) - - -def setup_handlers(config: KpopsConfig) -> ComponentHandlers: - schema_handler = SchemaHandler.load_schema_handler(config) - connector_handler = KafkaConnectHandler.from_kpops_config(config) - proxy_wrapper = ProxyWrapper(config.kafka_rest) - topic_handler = TopicHandler(proxy_wrapper) - - return ComponentHandlers(schema_handler, connector_handler, topic_handler) - - -def setup_logging_level(verbose: bool): - logging.getLogger().setLevel(logging.DEBUG if verbose else logging.INFO) - - def parse_steps(steps: str) -> set[str]: return set(steps.split(",")) -def is_in_steps(component: PipelineComponent, component_names: set[str]) -> bool: - return component.name in component_names - - -def create_default_step_names_filter_predicate( - component_names: set[str], filter_type: FilterType -) -> ComponentFilterPredicate: - def predicate(component: PipelineComponent) -> bool: - match filter_type, is_in_steps(component, component_names): - case (FilterType.INCLUDE, False) | (FilterType.EXCLUDE, True): - return False - case _: - return True - - return predicate - - def log_action(action: str, pipeline_component: PipelineComponent): log.info("\n") log.info(LOG_DIVIDER) @@ -192,20 +138,6 @@ def log_action(action: str, pipeline_component: PipelineComponent): log.info("\n") -def create_kpops_config( - config: Path, - dotenv: list[Path] | None = None, - environment: str | None = None, - verbose: bool = False, -) -> KpopsConfig: - setup_logging_level(verbose) - YamlConfigSettingsSource.config_dir = config - YamlConfigSettingsSource.environment = environment - return KpopsConfig( - _env_file=dotenv # pyright: ignore[reportCallIssue] - ) - - @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 help="Initialize a new KPOps project." ) @@ -246,12 +178,12 @@ def schema( ) -> None: match scope: case SchemaScope.PIPELINE: - kpops_config = create_kpops_config(config) + kpops_config = KpopsConfig.create(config) gen_pipeline_schema( kpops_config.components_module, include_stock_components ) case SchemaScope.DEFAULTS: - kpops_config = create_kpops_config(config) + kpops_config = KpopsConfig.create(config) gen_defaults_schema( kpops_config.components_module, include_stock_components ) @@ -267,39 +199,21 @@ def generate( pipeline_path: Path = PIPELINE_PATH_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, - output: bool = OUTPUT_OPTION, steps: Optional[str] = PIPELINE_STEPS, filter_type: FilterType = FILTER_TYPE, environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, -) -> Pipeline: - kpops_config = create_kpops_config( - config, +): + pipeline = kpops.generate( + pipeline_path, dotenv, + config, + steps, + filter_type, environment, verbose, ) - - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - - if steps: - component_names = parse_steps(steps) - log.debug( - f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" - ) - - predicate = create_default_step_names_filter_predicate( - component_names, filter_type - ) - pipeline.filter(predicate) - - def get_step_names(steps_to_apply: list[PipelineComponent]) -> list[str]: - return [step.name for step in steps_to_apply] - - log.info(f"Filtered pipeline:\n{get_step_names(pipeline.components)}") - if output: - print_yaml(pipeline.to_yaml()) - return pipeline + print_yaml(pipeline.to_yaml()) @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 @@ -316,11 +230,10 @@ def manifest( environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, ) -> list[Resource]: - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, @@ -348,11 +261,10 @@ def deploy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, @@ -386,11 +298,10 @@ def destroy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, @@ -426,11 +337,10 @@ def reset( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, @@ -465,11 +375,10 @@ def clean( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, diff --git a/kpops/cli/options.py b/kpops/cli/options.py index ac176d986..dc116bd35 100644 --- a/kpops/cli/options.py +++ b/kpops/cli/options.py @@ -1,6 +1,29 @@ +from __future__ import annotations + from enum import Enum +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from kpops.components import PipelineComponent + from kpops.pipeline import ComponentFilterPredicate class FilterType(str, Enum): INCLUDE = "include" EXCLUDE = "exclude" + + @staticmethod + def is_in_steps(component: PipelineComponent, component_names: set[str]) -> bool: + return component.name in component_names + + def create_default_step_names_filter_predicate( + self, component_names: set[str] + ) -> ComponentFilterPredicate: + def predicate(component: PipelineComponent) -> bool: + match self, FilterType.is_in_steps(component, component_names): + case (FilterType.INCLUDE, False) | (FilterType.EXCLUDE, True): + return False + case _: + return True + + return predicate diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index df2348773..97394259a 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -10,7 +10,7 @@ KafkaStreamsConfig, ) from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr -from kpops.pipeline import ValidationError +from kpops.exception import ValidationError from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/kpops/config.py b/kpops/config.py index f82068c96..b4a482f6b 100644 --- a/kpops/config.py +++ b/kpops/config.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from pathlib import Path from pydantic import AnyHttpUrl, Field, TypeAdapter @@ -127,6 +128,25 @@ class KpopsConfig(BaseSettings): model_config = SettingsConfigDict(env_prefix=ENV_PREFIX, env_nested_delimiter="__") + @classmethod + def create( + cls, + config: Path, + dotenv: list[Path] | None = None, + environment: str | None = None, + verbose: bool = False, + ) -> KpopsConfig: + cls.setup_logging_level(verbose) + YamlConfigSettingsSource.config_dir = config + YamlConfigSettingsSource.environment = environment + return KpopsConfig( + _env_file=dotenv # pyright: ignore[reportCallIssue] + ) + + @staticmethod + def setup_logging_level(verbose: bool): + logging.getLogger().setLevel(logging.DEBUG if verbose else logging.INFO) + @override @classmethod def settings_customise_sources( diff --git a/kpops/exception.py b/kpops/exception.py new file mode 100644 index 000000000..713274f3a --- /dev/null +++ b/kpops/exception.py @@ -0,0 +1,9 @@ +from __future__ import annotations + + +class ValidationError(Exception): + pass + + +class ParsingException(Exception): + pass diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 3f4f698b9..a218f335e 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -15,7 +15,16 @@ computed_field, ) +from kpops.cli.registry import Registry +from kpops.component_handlers import ComponentHandlers +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, +) +from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler +from kpops.component_handlers.topic.handler import TopicHandler +from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.exception import ParsingException, ValidationError from kpops.utils.dict_ops import update_nested_pair from kpops.utils.environment import ENV, PIPELINE_PATH from kpops.utils.yaml import load_yaml_file @@ -24,21 +33,10 @@ from collections.abc import Awaitable, Coroutine, Iterator from pathlib import Path - from kpops.cli.registry import Registry - from kpops.component_handlers import ComponentHandlers from kpops.config import KpopsConfig log = logging.getLogger("PipelineGenerator") - -class ParsingException(Exception): - pass - - -class ValidationError(Exception): - pass - - ComponentFilterPredicate: TypeAlias = Callable[[PipelineComponent], bool] @@ -50,6 +48,36 @@ class Pipeline(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) + @classmethod + def create( + cls, + pipeline_path: Path, + kpops_config: KpopsConfig, + environment: str | None, + ) -> Pipeline: + registry = Registry() + if kpops_config.components_module: + registry.find_components(kpops_config.components_module) + registry.find_components("kpops.components") + + handlers = cls.setup_handlers(kpops_config) + parser = PipelineGenerator(kpops_config, registry, handlers) + return parser.load_yaml(pipeline_path, environment) + + @staticmethod + def setup_handlers(config: KpopsConfig) -> ComponentHandlers: + schema_handler = SchemaHandler.load_schema_handler(config) + connector_handler = KafkaConnectHandler.from_kpops_config(config) + proxy_wrapper = ProxyWrapper(config.kafka_rest) + topic_handler = TopicHandler(proxy_wrapper) + + return ComponentHandlers(schema_handler, connector_handler, topic_handler) + + @computed_field(title="Step Names") + @property + def step_names(self) -> list[str]: + return [step.name for step in self.components] + @computed_field(title="Components") @property def components(self) -> list[SerializeAsAny[PipelineComponent]]: diff --git a/tests/cli/test_handlers.py b/tests/cli/test_handlers.py index 7ba0651a5..f873f6731 100644 --- a/tests/cli/test_handlers.py +++ b/tests/cli/test_handlers.py @@ -1,6 +1,5 @@ from pytest_mock import MockerFixture -from kpops.cli.main import setup_handlers from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, @@ -8,6 +7,7 @@ from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler from kpops.config import KpopsConfig, SchemaRegistryConfig +from kpops.pipeline import Pipeline from tests.cli.resources.custom_module import CustomSchemaProvider MODULE = CustomSchemaProvider.__module__ @@ -18,12 +18,12 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): kafka_brokers="broker:9092", components_module=MODULE, ) - connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") + connector_handler_mock = mocker.patch("kpops.pipeline.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") - wrapper = mocker.patch("kpops.cli.main.ProxyWrapper") + topic_handler_mock = mocker.patch("kpops.pipeline.TopicHandler") + wrapper = mocker.patch("kpops.pipeline.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler @@ -33,7 +33,7 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): topic_handler=topic_handler, ) - actual_handlers = setup_handlers(config) + actual_handlers = Pipeline.setup_handlers(config) connector_handler_mock.from_kpops_config.assert_called_once_with(config) @@ -51,16 +51,16 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): schema_registry=SchemaRegistryConfig(enabled=True), kafka_brokers="broker:9092", ) - schema_handler_mock = mocker.patch("kpops.cli.main.SchemaHandler") + schema_handler_mock = mocker.patch("kpops.pipeline.SchemaHandler") schema_handler = SchemaHandler.load_schema_handler(config) schema_handler_mock.load_schema_handler.return_value = schema_handler - connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") + connector_handler_mock = mocker.patch("kpops.pipeline.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") - wrapper = mocker.patch("kpops.cli.main.ProxyWrapper") + topic_handler_mock = mocker.patch("kpops.pipeline.TopicHandler") + wrapper = mocker.patch("kpops.pipeline.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler @@ -70,7 +70,7 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): topic_handler=topic_handler, ) - actual_handlers = setup_handlers(config) + actual_handlers = Pipeline.setup_handlers(config) schema_handler_mock.load_schema_handler.assert_called_once_with(config) diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 7af0634d7..d6a0f0d87 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -29,7 +29,7 @@ StreamsAppCleaner, ) from kpops.config import KpopsConfig, TopicNameConfig -from kpops.pipeline import ValidationError +from kpops.exception import ValidationError from tests.components import PIPELINE_BASE_DIR RESOURCES_PATH = Path(__file__).parent / "resources" diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 8b237f705..9ce6240fd 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -12,7 +12,7 @@ import kpops from kpops.cli.main import FilterType, app from kpops.components import KafkaSinkConnector, PipelineComponent -from kpops.pipeline import ParsingException, ValidationError +from kpops.exception import ParsingException, ValidationError runner = CliRunner() @@ -23,12 +23,11 @@ class TestGenerate: @pytest.fixture(autouse=True) def log_info(self, mocker: MockerFixture) -> MagicMock: - return mocker.patch("kpops.cli.main.log.info") + return mocker.patch("kpops.api.log.info") def test_python_api(self): pipeline = kpops.generate( RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - output=False, ) assert len(pipeline) == 3 assert [component.type for component in pipeline.components] == [ @@ -40,25 +39,25 @@ def test_python_api(self): def test_python_api_filter_include(self, log_info: MagicMock): pipeline = kpops.generate( RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - output=False, steps="converter", filter_type=FilterType.INCLUDE, ) assert len(pipeline) == 1 assert pipeline.components[0].type == "converter" - assert log_info.call_count == 1 + assert log_info.call_count == 2 + log_info.assert_any_call("Picked up pipeline 'first-pipeline'") log_info.assert_any_call("Filtered pipeline:\n['converter']") def test_python_api_filter_exclude(self, log_info: MagicMock): pipeline = kpops.generate( RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - output=False, steps="converter,scheduled-producer", filter_type=FilterType.EXCLUDE, ) assert len(pipeline) == 1 assert pipeline.components[0].type == "filter" - assert log_info.call_count == 1 + assert log_info.call_count == 2 + log_info.assert_any_call("Picked up pipeline 'first-pipeline'") log_info.assert_any_call( "Filtered pipeline:\n['a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name']" ) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index d0c53667b..6f79c8f37 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -3,7 +3,6 @@ import pytest from polyfactory.factories.pydantic_factory import ModelFactory -from kpops.cli.main import create_default_step_names_filter_predicate from kpops.cli.options import FilterType from kpops.component_handlers import ( ComponentHandlers, @@ -44,8 +43,8 @@ def pipeline(self) -> Pipeline: return pipeline def test_filter_include(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - {"example2", "example3"}, FilterType.INCLUDE + predicate = FilterType.INCLUDE.create_default_step_names_filter_predicate( + {"example2", "example3"} ) pipeline.filter(predicate) assert len(pipeline.components) == 2 @@ -53,23 +52,19 @@ def test_filter_include(self, pipeline: Pipeline): assert test_component_3 in pipeline.components def test_filter_include_empty(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - set(), FilterType.INCLUDE - ) + predicate = FilterType.INCLUDE.create_default_step_names_filter_predicate(set()) pipeline.filter(predicate) assert len(pipeline.components) == 0 def test_filter_exclude(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - {"example2", "example3"}, FilterType.EXCLUDE + predicate = FilterType.EXCLUDE.create_default_step_names_filter_predicate( + {"example2", "example3"} ) pipeline.filter(predicate) assert len(pipeline.components) == 1 assert test_component_1 in pipeline.components def test_filter_exclude_empty(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - set(), FilterType.EXCLUDE - ) + predicate = FilterType.EXCLUDE.create_default_step_names_filter_predicate(set()) pipeline.filter(predicate) assert len(pipeline.components) == 3 From d3d66fcee0c35aacde527e64c7e81c20f42574d9 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 21 May 2024 16:00:02 +0200 Subject: [PATCH 02/10] Separate KPOps API from the CLI --- docs/docs/user/references/cli-commands.md | 1 - kpops/__init__.py | 4 +-- kpops/api.py | 30 +++++++++++++++++++ kpops/cli/main.py | 16 +++------- .../base_components/models/resource.py | 4 +-- tests/pipeline/test_manifest.py | 1 - 6 files changed, 38 insertions(+), 18 deletions(-) diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 7225c89ad..62d940123 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -163,7 +163,6 @@ $ kpops manifest [OPTIONS] PIPELINE_PATH * `--dotenv FILE`: Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. [env var: KPOPS_DOTENV_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] -* `--output / --no-output`: Enable output printing [default: output] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] * `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] diff --git a/kpops/__init__.py b/kpops/__init__.py index 474b65c61..af246bb97 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,8 +1,8 @@ __version__ = "5.0.1" # export public API functions -from kpops.api import generate -from kpops.cli.main import clean, deploy, destroy, init, manifest, reset +from kpops.api import generate, manifest +from kpops.cli.main import clean, deploy, destroy, init, reset __all__ = ( "generate", diff --git a/kpops/api.py b/kpops/api.py index 7c8190699..37af128b9 100644 --- a/kpops/api.py +++ b/kpops/api.py @@ -2,13 +2,18 @@ import logging from pathlib import Path +from typing import TYPE_CHECKING +import kpops from kpops.cli.options import FilterType from kpops.config import KpopsConfig from kpops.pipeline import ( Pipeline, ) +if TYPE_CHECKING: + from kpops.components.base_components.models.resource import Resource + log = logging.getLogger("KPOpsAPI") @@ -45,3 +50,28 @@ def generate( pipeline.filter(predicate) log.info(f"Filtered pipeline:\n{pipeline.step_names}") return pipeline + + +def manifest( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + verbose: bool = False, +) -> list[Resource]: + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + resources: list[Resource] = [] + for component in pipeline.components: + resource = component.manifest() + resources.append(resource) + return resources diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 911071bcd..425a0c00b 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -12,7 +12,6 @@ from kpops import __version__ from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType -from kpops.components.base_components.models.resource import Resource from kpops.config import ENV_PREFIX, KpopsConfig from kpops.utils.cli_commands import init_project from kpops.utils.gen_schema import ( @@ -224,13 +223,12 @@ def manifest( pipeline_path: Path = PIPELINE_PATH_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, - output: bool = OUTPUT_OPTION, steps: Optional[str] = PIPELINE_STEPS, filter_type: FilterType = FILTER_TYPE, environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, -) -> list[Resource]: - pipeline = kpops.generate( +): + resources = kpops.manifest( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -239,14 +237,8 @@ def manifest( environment=environment, verbose=verbose, ) - resources: list[Resource] = [] - for component in pipeline.components: - resource = component.manifest() - resources.append(resource) - if output: - for manifest in resource: - print_yaml(manifest) - return resources + for rendered in resources: + print_yaml(rendered) @app.command(help="Deploy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 diff --git a/kpops/components/base_components/models/resource.py b/kpops/components/base_components/models/resource.py index 08c01f344..fc0eff7a1 100644 --- a/kpops/components/base_components/models/resource.py +++ b/kpops/components/base_components/models/resource.py @@ -1,5 +1,5 @@ -from collections.abc import Mapping, Sequence +from collections.abc import Mapping from typing import Any, TypeAlias # representation of final resource for component, e.g. a list of Kubernetes manifests -Resource: TypeAlias = Sequence[Mapping[str, Any]] +Resource: TypeAlias = Mapping[str, Any] diff --git a/tests/pipeline/test_manifest.py b/tests/pipeline/test_manifest.py index 8dc58ae93..d12dc9172 100644 --- a/tests/pipeline/test_manifest.py +++ b/tests/pipeline/test_manifest.py @@ -101,7 +101,6 @@ def test_custom_config(self, mock_execute: MagicMock): def test_python_api(self, snapshot: Snapshot): resources = kpops.manifest( RESOURCE_PATH / "custom-config/pipeline.yaml", - output=False, environment="development", ) assert isinstance(resources, list) From d1d102e8f9dfcbc6d26d208757459f1944c58f14 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 21 May 2024 16:18:29 +0200 Subject: [PATCH 03/10] Move all cli to api --- kpops/__init__.py | 3 +- kpops/api.py | 181 +++++++++++++++++++++++++++++++++++++++++++++- kpops/cli/main.py | 116 ++++------------------------- 3 files changed, 195 insertions(+), 105 deletions(-) diff --git a/kpops/__init__.py b/kpops/__init__.py index af246bb97..ab41f32b2 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,8 +1,7 @@ __version__ = "5.0.1" # export public API functions -from kpops.api import generate, manifest -from kpops.cli.main import clean, deploy, destroy, init, reset +from kpops.api import clean, deploy, destroy, generate, init, manifest, reset __all__ = ( "generate", diff --git a/kpops/api.py b/kpops/api.py index 37af128b9..c85ced1a0 100644 --- a/kpops/api.py +++ b/kpops/api.py @@ -1,20 +1,39 @@ from __future__ import annotations +import asyncio import logging from pathlib import Path from typing import TYPE_CHECKING import kpops +from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType from kpops.config import KpopsConfig from kpops.pipeline import ( Pipeline, ) +from kpops.utils.cli_commands import init_project if TYPE_CHECKING: + from kpops.components import PipelineComponent from kpops.components.base_components.models.resource import Resource -log = logging.getLogger("KPOpsAPI") +logger = logging.getLogger() +logging.getLogger("httpx").setLevel(logging.WARNING) +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(CustomFormatter()) +logger.addHandler(stream_handler) + +log = logging.getLogger("") +LOG_DIVIDER = "#" * 100 + + +def log_action(action: str, pipeline_component: PipelineComponent): + log.info("\n") + log.info(LOG_DIVIDER) + log.info(f"{action} {pipeline_component.name}") + log.info(LOG_DIVIDER) + log.info("\n") def parse_steps(steps: str) -> set[str]: @@ -75,3 +94,163 @@ def manifest( resource = component.manifest() resources.append(resource) return resources + + +def deploy( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def deploy_runner(component: PipelineComponent): + log_action("Deploy", component) + await component.deploy(dry_run) + + async def async_deploy(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(deploy_runner) + await pipeline_tasks + else: + for component in pipeline.components: + await deploy_runner(component) + + asyncio.run(async_deploy()) + + +def destroy( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def destroy_runner(component: PipelineComponent): + log_action("Destroy", component) + await component.destroy(dry_run) + + async def async_destroy(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph( + destroy_runner, reverse=True + ) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await destroy_runner(component) + + asyncio.run(async_destroy()) + + +def reset( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def reset_runner(component: PipelineComponent): + await component.destroy(dry_run) + log_action("Reset", component) + await component.reset(dry_run) + + async def async_reset(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await reset_runner(component) + + asyncio.run(async_reset()) + + +def clean( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def clean_runner(component: PipelineComponent): + await component.destroy(dry_run) + log_action("Clean", component) + await component.clean(dry_run) + + async def async_clean(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await clean_runner(component) + + asyncio.run(async_clean()) + + +def init( + path: Path, + config_include_opt: bool = False, +): + if not path.exists(): + path.mkdir(parents=False) + elif next(path.iterdir(), False): + log.warning("Please provide a path to an empty directory.") + return + init_project(path, config_include_opt) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 425a0c00b..a364b3a5b 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -1,19 +1,15 @@ from __future__ import annotations -import asyncio -import logging from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import Optional import dtyper import typer import kpops from kpops import __version__ -from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType from kpops.config import ENV_PREFIX, KpopsConfig -from kpops.utils.cli_commands import init_project from kpops.utils.gen_schema import ( SchemaScope, gen_config_schema, @@ -22,12 +18,6 @@ ) from kpops.utils.yaml import print_yaml -if TYPE_CHECKING: - from kpops.components import PipelineComponent - - -LOG_DIVIDER = "#" * 100 - app = dtyper.Typer(pretty_exceptions_enable=False) DOTENV_PATH_OPTION: Optional[list[Path]] = typer.Option( @@ -116,27 +106,6 @@ ) -logger = logging.getLogger() -logging.getLogger("httpx").setLevel(logging.WARNING) -stream_handler = logging.StreamHandler() -stream_handler.setFormatter(CustomFormatter()) -logger.addHandler(stream_handler) - -log = logging.getLogger("") - - -def parse_steps(steps: str) -> set[str]: - return set(steps.split(",")) - - -def log_action(action: str, pipeline_component: PipelineComponent): - log.info("\n") - log.info(LOG_DIVIDER) - log.info(f"{action} {pipeline_component.name}") - log.info(LOG_DIVIDER) - log.info("\n") - - @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 help="Initialize a new KPOps project." ) @@ -144,12 +113,7 @@ def init( path: Path = PROJECT_PATH, config_include_opt: bool = CONFIG_INCLUDE_OPTIONAL, ): - if not path.exists(): - path.mkdir(parents=False) - elif next(path.iterdir(), False): - log.warning("Please provide a path to an empty directory.") - return - init_project(path, config_include_opt) + kpops.init(path, config_include_opt=config_include_opt) @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 @@ -253,30 +217,18 @@ def deploy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = kpops.generate( + kpops.deploy( pipeline_path=pipeline_path, dotenv=dotenv, config=config, steps=steps, filter_type=filter_type, environment=environment, + dry_run=dry_run, verbose=verbose, + parallel=parallel, ) - async def deploy_runner(component: PipelineComponent): - log_action("Deploy", component) - await component.deploy(dry_run) - - async def async_deploy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(deploy_runner) - await pipeline_tasks - else: - for component in pipeline.components: - await deploy_runner(component) - - asyncio.run(async_deploy()) - @app.command(help="Destroy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 def destroy( @@ -290,32 +242,18 @@ def destroy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = kpops.generate( + kpops.destroy( pipeline_path=pipeline_path, dotenv=dotenv, config=config, steps=steps, filter_type=filter_type, environment=environment, + dry_run=dry_run, verbose=verbose, + parallel=parallel, ) - async def destroy_runner(component: PipelineComponent): - log_action("Destroy", component) - await component.destroy(dry_run) - - async def async_destroy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph( - destroy_runner, reverse=True - ) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await destroy_runner(component) - - asyncio.run(async_destroy()) - @app.command(help="Reset pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 def reset( @@ -329,31 +267,18 @@ def reset( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = kpops.generate( + kpops.reset( pipeline_path=pipeline_path, dotenv=dotenv, config=config, steps=steps, filter_type=filter_type, environment=environment, + dry_run=dry_run, verbose=verbose, + parallel=parallel, ) - async def reset_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Reset", component) - await component.reset(dry_run) - - async def async_reset(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await reset_runner(component) - - asyncio.run(async_reset()) - @app.command(help="Clean pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 def clean( @@ -367,31 +292,18 @@ def clean( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = kpops.generate( + kpops.clean( pipeline_path=pipeline_path, dotenv=dotenv, config=config, steps=steps, filter_type=filter_type, environment=environment, + dry_run=dry_run, verbose=verbose, + parallel=parallel, ) - async def clean_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Clean", component) - await component.clean(dry_run) - - async def async_clean(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await clean_runner(component) - - asyncio.run(async_clean()) - def version_callback(show_version: bool) -> None: if show_version: From 3172bdb85b89dbb5f09f86da502410c79653d298 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 21 May 2024 16:26:53 +0200 Subject: [PATCH 04/10] Fix pyright errors --- kpops/cli/main.py | 5 +++-- kpops/components/base_components/models/resource.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index a364b3a5b..d15934e15 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -201,8 +201,9 @@ def manifest( environment=environment, verbose=verbose, ) - for rendered in resources: - print_yaml(rendered) + for resource in resources: + for rendered_manifest in resource: + print_yaml(rendered_manifest) @app.command(help="Deploy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 diff --git a/kpops/components/base_components/models/resource.py b/kpops/components/base_components/models/resource.py index fc0eff7a1..08c01f344 100644 --- a/kpops/components/base_components/models/resource.py +++ b/kpops/components/base_components/models/resource.py @@ -1,5 +1,5 @@ -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from typing import Any, TypeAlias # representation of final resource for component, e.g. a list of Kubernetes manifests -Resource: TypeAlias = Mapping[str, Any] +Resource: TypeAlias = Sequence[Mapping[str, Any]] From 14bfc2c29f686fba58f326cd09d195348dc1d9fa Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 11:00:32 +0200 Subject: [PATCH 05/10] address changes --- hooks/gen_docs/gen_docs_components.py | 2 +- kpops/__init__.py | 2 +- kpops/api/__init__.py | 0 kpops/{ => api}/api.py | 40 +++++++++++++++++-- kpops/{cli => api}/custom_formatter.py | 0 kpops/{ => api}/exception.py | 4 ++ kpops/{cli => api}/options.py | 0 kpops/{cli => api}/registry.py | 2 +- kpops/cli/exception.py | 2 - kpops/cli/main.py | 2 +- .../schema_handler/schema_handler.py | 4 +- .../streams_bootstrap/streams/model.py | 2 +- kpops/pipeline.py | 36 +---------------- kpops/utils/gen_schema.py | 2 +- tests/cli/test_handlers.py | 22 +++++----- tests/cli/test_registry.py | 3 +- tests/cli/test_schema_generation.py | 2 +- tests/components/test_streams_app.py | 2 +- tests/pipeline/test_generate.py | 4 +- tests/pipeline/test_pipeline.py | 2 +- 20 files changed, 70 insertions(+), 63 deletions(-) create mode 100644 kpops/api/__init__.py rename kpops/{ => api}/api.py (82%) rename kpops/{cli => api}/custom_formatter.py (100%) rename kpops/{ => api}/exception.py (50%) rename kpops/{cli => api}/options.py (100%) rename kpops/{cli => api}/registry.py (97%) delete mode 100644 kpops/cli/exception.py diff --git a/hooks/gen_docs/gen_docs_components.py b/hooks/gen_docs/gen_docs_components.py index 2e0a4f1a0..10bb40af7 100644 --- a/hooks/gen_docs/gen_docs_components.py +++ b/hooks/gen_docs/gen_docs_components.py @@ -8,7 +8,7 @@ import yaml from hooks import ROOT -from kpops.cli.registry import _find_classes +from kpops.api.registry import _find_classes from kpops.components import KafkaConnector, PipelineComponent from kpops.utils.colorify import redify, yellowify from kpops.utils.pydantic import issubclass_patched diff --git a/kpops/__init__.py b/kpops/__init__.py index 778df97f8..ccb7bb4ce 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,7 +1,7 @@ __version__ = "5.1.0" # export public API functions -from kpops.api import clean, deploy, destroy, generate, init, manifest, reset +from kpops.api.api import clean, deploy, destroy, generate, init, manifest, reset __all__ = ( "generate", diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kpops/api.py b/kpops/api/api.py similarity index 82% rename from kpops/api.py rename to kpops/api/api.py index c85ced1a0..5903e6f21 100644 --- a/kpops/api.py +++ b/kpops/api/api.py @@ -6,17 +6,27 @@ from typing import TYPE_CHECKING import kpops -from kpops.cli.custom_formatter import CustomFormatter -from kpops.cli.options import FilterType +from kpops.api.custom_formatter import CustomFormatter +from kpops.api.options import FilterType +from kpops.api.registry import Registry +from kpops.component_handlers import ComponentHandlers +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, +) +from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler +from kpops.component_handlers.topic.handler import TopicHandler +from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.config import KpopsConfig from kpops.pipeline import ( Pipeline, + PipelineGenerator, ) from kpops.utils.cli_commands import init_project if TYPE_CHECKING: from kpops.components import PipelineComponent from kpops.components.base_components.models.resource import Resource + from kpops.config import KpopsConfig logger = logging.getLogger() logging.getLogger("httpx").setLevel(logging.WARNING) @@ -55,7 +65,7 @@ def generate( environment, verbose, ) - pipeline = Pipeline.create(pipeline_path, kpops_config, environment) + pipeline = create_pipeline(pipeline_path, kpops_config, environment) log.info(f"Picked up pipeline '{pipeline_path.parent.name}'") if steps: component_names = parse_steps(steps) @@ -254,3 +264,27 @@ def init( log.warning("Please provide a path to an empty directory.") return init_project(path, config_include_opt) + + +def create_pipeline( + pipeline_path: Path, + kpops_config: KpopsConfig, + environment: str | None, +) -> Pipeline: + registry = Registry() + if kpops_config.components_module: + registry.find_components(kpops_config.components_module) + registry.find_components("kpops.components") + + handlers = setup_handlers(kpops_config) + parser = PipelineGenerator(kpops_config, registry, handlers) + return parser.load_yaml(pipeline_path, environment) + + +def setup_handlers(config: KpopsConfig) -> ComponentHandlers: + schema_handler = SchemaHandler.load_schema_handler(config) + connector_handler = KafkaConnectHandler.from_kpops_config(config) + proxy_wrapper = ProxyWrapper(config.kafka_rest) + topic_handler = TopicHandler(proxy_wrapper) + + return ComponentHandlers(schema_handler, connector_handler, topic_handler) diff --git a/kpops/cli/custom_formatter.py b/kpops/api/custom_formatter.py similarity index 100% rename from kpops/cli/custom_formatter.py rename to kpops/api/custom_formatter.py diff --git a/kpops/exception.py b/kpops/api/exception.py similarity index 50% rename from kpops/exception.py rename to kpops/api/exception.py index 713274f3a..65094fd29 100644 --- a/kpops/exception.py +++ b/kpops/api/exception.py @@ -7,3 +7,7 @@ class ValidationError(Exception): class ParsingException(Exception): pass + + +class ClassNotFoundError(Exception): + """Similar to builtin `ModuleNotFoundError`; class doesn't exist inside module.""" diff --git a/kpops/cli/options.py b/kpops/api/options.py similarity index 100% rename from kpops/cli/options.py rename to kpops/api/options.py diff --git a/kpops/cli/registry.py b/kpops/api/registry.py similarity index 97% rename from kpops/cli/registry.py rename to kpops/api/registry.py index 2b3b51631..2df483329 100644 --- a/kpops/cli/registry.py +++ b/kpops/api/registry.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, TypeVar from kpops import __name__ -from kpops.cli.exception import ClassNotFoundError +from kpops.api.exception import ClassNotFoundError from kpops.components.base_components.pipeline_component import PipelineComponent if TYPE_CHECKING: diff --git a/kpops/cli/exception.py b/kpops/cli/exception.py deleted file mode 100644 index e9b0a65de..000000000 --- a/kpops/cli/exception.py +++ /dev/null @@ -1,2 +0,0 @@ -class ClassNotFoundError(Exception): - """Similar to builtin `ModuleNotFoundError`; class doesn't exist inside module.""" diff --git a/kpops/cli/main.py b/kpops/cli/main.py index d15934e15..4bc324d68 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -8,7 +8,7 @@ import kpops from kpops import __version__ -from kpops.cli.options import FilterType +from kpops.api.options import FilterType from kpops.config import ENV_PREFIX, KpopsConfig from kpops.utils.gen_schema import ( SchemaScope, diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index 936ba0223..94b5cd3bf 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -8,8 +8,8 @@ from schema_registry.client import AsyncSchemaRegistryClient from schema_registry.client.schema import AvroSchema -from kpops.cli.exception import ClassNotFoundError -from kpops.cli.registry import find_class +from kpops.api.exception import ClassNotFoundError +from kpops.api.registry import find_class from kpops.component_handlers.schema_handler.schema_provider import ( Schema, SchemaProvider, diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 97394259a..04f95b54b 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -5,12 +5,12 @@ import pydantic from pydantic import BaseModel, ConfigDict, Field, model_validator +from kpops.api.exception import ValidationError from kpops.components.base_components.kafka_app import ( KafkaAppValues, KafkaStreamsConfig, ) from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr -from kpops.exception import ValidationError from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/kpops/pipeline.py b/kpops/pipeline.py index a218f335e..5545949c7 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -15,16 +15,10 @@ computed_field, ) -from kpops.cli.registry import Registry +from kpops.api.exception import ParsingException, ValidationError +from kpops.api.registry import Registry from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( - KafkaConnectHandler, -) -from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler -from kpops.component_handlers.topic.handler import TopicHandler -from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.components.base_components.pipeline_component import PipelineComponent -from kpops.exception import ParsingException, ValidationError from kpops.utils.dict_ops import update_nested_pair from kpops.utils.environment import ENV, PIPELINE_PATH from kpops.utils.yaml import load_yaml_file @@ -48,32 +42,6 @@ class Pipeline(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) - @classmethod - def create( - cls, - pipeline_path: Path, - kpops_config: KpopsConfig, - environment: str | None, - ) -> Pipeline: - registry = Registry() - if kpops_config.components_module: - registry.find_components(kpops_config.components_module) - registry.find_components("kpops.components") - - handlers = cls.setup_handlers(kpops_config) - parser = PipelineGenerator(kpops_config, registry, handlers) - return parser.load_yaml(pipeline_path, environment) - - @staticmethod - def setup_handlers(config: KpopsConfig) -> ComponentHandlers: - schema_handler = SchemaHandler.load_schema_handler(config) - connector_handler = KafkaConnectHandler.from_kpops_config(config) - proxy_wrapper = ProxyWrapper(config.kafka_rest) - topic_handler = TopicHandler(proxy_wrapper) - - return ComponentHandlers(schema_handler, connector_handler, topic_handler) - - @computed_field(title="Step Names") @property def step_names(self) -> list[str]: return [step.name for step in self.components] diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index b07749ec6..223e8adda 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -21,7 +21,7 @@ ModelFieldsSchema, ) -from kpops.cli.registry import _find_classes +from kpops.api.registry import _find_classes from kpops.components import ( PipelineComponent, ) diff --git a/tests/cli/test_handlers.py b/tests/cli/test_handlers.py index f873f6731..be1b63474 100644 --- a/tests/cli/test_handlers.py +++ b/tests/cli/test_handlers.py @@ -1,5 +1,6 @@ from pytest_mock import MockerFixture +from kpops.api.api import setup_handlers from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, @@ -7,9 +8,10 @@ from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler from kpops.config import KpopsConfig, SchemaRegistryConfig -from kpops.pipeline import Pipeline from tests.cli.resources.custom_module import CustomSchemaProvider +HANDLER_MODULE = "kpops.api.api" + MODULE = CustomSchemaProvider.__module__ @@ -18,12 +20,12 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): kafka_brokers="broker:9092", components_module=MODULE, ) - connector_handler_mock = mocker.patch("kpops.pipeline.KafkaConnectHandler") + connector_handler_mock = mocker.patch(f"{HANDLER_MODULE}.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.pipeline.TopicHandler") - wrapper = mocker.patch("kpops.pipeline.ProxyWrapper") + topic_handler_mock = mocker.patch(f"{HANDLER_MODULE}.TopicHandler") + wrapper = mocker.patch(f"{HANDLER_MODULE}.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler @@ -33,7 +35,7 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): topic_handler=topic_handler, ) - actual_handlers = Pipeline.setup_handlers(config) + actual_handlers = setup_handlers(config) connector_handler_mock.from_kpops_config.assert_called_once_with(config) @@ -51,16 +53,16 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): schema_registry=SchemaRegistryConfig(enabled=True), kafka_brokers="broker:9092", ) - schema_handler_mock = mocker.patch("kpops.pipeline.SchemaHandler") + schema_handler_mock = mocker.patch(f"{HANDLER_MODULE}.SchemaHandler") schema_handler = SchemaHandler.load_schema_handler(config) schema_handler_mock.load_schema_handler.return_value = schema_handler - connector_handler_mock = mocker.patch("kpops.pipeline.KafkaConnectHandler") + connector_handler_mock = mocker.patch(f"{HANDLER_MODULE}.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.pipeline.TopicHandler") - wrapper = mocker.patch("kpops.pipeline.ProxyWrapper") + topic_handler_mock = mocker.patch(f"{HANDLER_MODULE}.TopicHandler") + wrapper = mocker.patch(f"{HANDLER_MODULE}.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler @@ -70,7 +72,7 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): topic_handler=topic_handler, ) - actual_handlers = Pipeline.setup_handlers(config) + actual_handlers = setup_handlers(config) schema_handler_mock.load_schema_handler.assert_called_once_with(config) diff --git a/tests/cli/test_registry.py b/tests/cli/test_registry.py index f305c24a3..00daee08d 100644 --- a/tests/cli/test_registry.py +++ b/tests/cli/test_registry.py @@ -2,7 +2,8 @@ import pytest -from kpops.cli.registry import ClassNotFoundError, Registry, _find_classes, find_class +from kpops.api.exception import ClassNotFoundError +from kpops.api.registry import Registry, _find_classes, find_class from kpops.component_handlers.schema_handler.schema_provider import SchemaProvider from kpops.components.base_components.pipeline_component import PipelineComponent from tests.cli.resources.custom_module import CustomSchemaProvider diff --git a/tests/cli/test_schema_generation.py b/tests/cli/test_schema_generation.py index 4e1b4aa09..0741340e0 100644 --- a/tests/cli/test_schema_generation.py +++ b/tests/cli/test_schema_generation.py @@ -9,8 +9,8 @@ from pydantic import ConfigDict, Field from typer.testing import CliRunner +from kpops.api.registry import Registry from kpops.cli.main import app -from kpops.cli.registry import Registry from kpops.components import PipelineComponent from kpops.utils.docstring import describe_attr diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index d6a0f0d87..cb340174a 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -5,6 +5,7 @@ import pytest from pytest_mock import MockerFixture +from kpops.api.exception import ValidationError from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import ( HelmDiffConfig, @@ -29,7 +30,6 @@ StreamsAppCleaner, ) from kpops.config import KpopsConfig, TopicNameConfig -from kpops.exception import ValidationError from tests.components import PIPELINE_BASE_DIR RESOURCES_PATH = Path(__file__).parent / "resources" diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 9ce6240fd..0f64a144e 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -10,9 +10,9 @@ from typer.testing import CliRunner import kpops +from kpops.api.exception import ParsingException, ValidationError from kpops.cli.main import FilterType, app from kpops.components import KafkaSinkConnector, PipelineComponent -from kpops.exception import ParsingException, ValidationError runner = CliRunner() @@ -23,7 +23,7 @@ class TestGenerate: @pytest.fixture(autouse=True) def log_info(self, mocker: MockerFixture) -> MagicMock: - return mocker.patch("kpops.api.log.info") + return mocker.patch("kpops.api.api.log.info") def test_python_api(self): pipeline = kpops.generate( diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 6f79c8f37..b5c741ad0 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -3,7 +3,7 @@ import pytest from polyfactory.factories.pydantic_factory import ModelFactory -from kpops.cli.options import FilterType +from kpops.api.options import FilterType from kpops.component_handlers import ( ComponentHandlers, ) From 5b5acafcac449cb8de6671691356ccde57f94749 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 13:43:43 +0200 Subject: [PATCH 06/10] address reviews --- kpops/__init__.py | 2 +- kpops/api/__init__.py | 272 +++++++++++++++++++ kpops/api/api.py | 290 --------------------- kpops/api/{custom_formatter.py => logs.py} | 24 ++ tests/cli/test_handlers.py | 4 +- tests/pipeline/test_generate.py | 2 +- 6 files changed, 300 insertions(+), 294 deletions(-) delete mode 100644 kpops/api/api.py rename kpops/api/{custom_formatter.py => logs.py} (55%) diff --git a/kpops/__init__.py b/kpops/__init__.py index ccb7bb4ce..778df97f8 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,7 +1,7 @@ __version__ = "5.1.0" # export public API functions -from kpops.api.api import clean, deploy, destroy, generate, init, manifest, reset +from kpops.api import clean, deploy, destroy, generate, init, manifest, reset __all__ = ( "generate", diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py index e69de29bb..481028e1a 100644 --- a/kpops/api/__init__.py +++ b/kpops/api/__init__.py @@ -0,0 +1,272 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import TYPE_CHECKING + +import kpops +from kpops.api.logs import log, log_action +from kpops.api.options import FilterType +from kpops.api.registry import Registry +from kpops.component_handlers import ComponentHandlers +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, +) +from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler +from kpops.component_handlers.topic.handler import TopicHandler +from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper +from kpops.config import KpopsConfig +from kpops.pipeline import ( + Pipeline, + PipelineGenerator, +) +from kpops.utils.cli_commands import init_project + +if TYPE_CHECKING: + from kpops.components import PipelineComponent + from kpops.components.base_components.models.resource import Resource + from kpops.config import KpopsConfig + + +def parse_steps(steps: str) -> set[str]: + return set(steps.split(",")) + + +def generate( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + verbose: bool = False, +) -> Pipeline: + kpops_config = KpopsConfig.create( + config, + dotenv, + environment, + verbose, + ) + pipeline = create_pipeline(pipeline_path, kpops_config, environment) + log.info(f"Picked up pipeline '{pipeline_path.parent.name}'") + if steps: + component_names = parse_steps(steps) + log.debug( + f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" + ) + + predicate = filter_type.create_default_step_names_filter_predicate( + component_names + ) + pipeline.filter(predicate) + log.info(f"Filtered pipeline:\n{pipeline.step_names}") + return pipeline + + +def manifest( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + verbose: bool = False, +) -> list[Resource]: + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + resources: list[Resource] = [] + for component in pipeline.components: + resource = component.manifest() + resources.append(resource) + return resources + + +def deploy( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def deploy_runner(component: PipelineComponent): + log_action("Deploy", component) + await component.deploy(dry_run) + + async def async_deploy(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(deploy_runner) + await pipeline_tasks + else: + for component in pipeline.components: + await deploy_runner(component) + + asyncio.run(async_deploy()) + + +def destroy( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def destroy_runner(component: PipelineComponent): + log_action("Destroy", component) + await component.destroy(dry_run) + + async def async_destroy(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph( + destroy_runner, reverse=True + ) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await destroy_runner(component) + + asyncio.run(async_destroy()) + + +def reset( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def reset_runner(component: PipelineComponent): + await component.destroy(dry_run) + log_action("Reset", component) + await component.reset(dry_run) + + async def async_reset(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await reset_runner(component) + + asyncio.run(async_reset()) + + +def clean( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def clean_runner(component: PipelineComponent): + await component.destroy(dry_run) + log_action("Clean", component) + await component.clean(dry_run) + + async def async_clean(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await clean_runner(component) + + asyncio.run(async_clean()) + + +def init( + path: Path, + config_include_opt: bool = False, +): + if not path.exists(): + path.mkdir(parents=False) + elif next(path.iterdir(), False): + log.warning("Please provide a path to an empty directory.") + return + init_project(path, config_include_opt) + + +def create_pipeline( + pipeline_path: Path, + kpops_config: KpopsConfig, + environment: str | None, +) -> Pipeline: + registry = Registry() + if kpops_config.components_module: + registry.find_components(kpops_config.components_module) + registry.find_components("kpops.components") + + handlers = setup_handlers(kpops_config) + parser = PipelineGenerator(kpops_config, registry, handlers) + return parser.load_yaml(pipeline_path, environment) + + +def setup_handlers(config: KpopsConfig) -> ComponentHandlers: + schema_handler = SchemaHandler.load_schema_handler(config) + connector_handler = KafkaConnectHandler.from_kpops_config(config) + proxy_wrapper = ProxyWrapper(config.kafka_rest) + topic_handler = TopicHandler(proxy_wrapper) + + return ComponentHandlers(schema_handler, connector_handler, topic_handler) diff --git a/kpops/api/api.py b/kpops/api/api.py deleted file mode 100644 index 5903e6f21..000000000 --- a/kpops/api/api.py +++ /dev/null @@ -1,290 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -from pathlib import Path -from typing import TYPE_CHECKING - -import kpops -from kpops.api.custom_formatter import CustomFormatter -from kpops.api.options import FilterType -from kpops.api.registry import Registry -from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( - KafkaConnectHandler, -) -from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler -from kpops.component_handlers.topic.handler import TopicHandler -from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper -from kpops.config import KpopsConfig -from kpops.pipeline import ( - Pipeline, - PipelineGenerator, -) -from kpops.utils.cli_commands import init_project - -if TYPE_CHECKING: - from kpops.components import PipelineComponent - from kpops.components.base_components.models.resource import Resource - from kpops.config import KpopsConfig - -logger = logging.getLogger() -logging.getLogger("httpx").setLevel(logging.WARNING) -stream_handler = logging.StreamHandler() -stream_handler.setFormatter(CustomFormatter()) -logger.addHandler(stream_handler) - -log = logging.getLogger("") -LOG_DIVIDER = "#" * 100 - - -def log_action(action: str, pipeline_component: PipelineComponent): - log.info("\n") - log.info(LOG_DIVIDER) - log.info(f"{action} {pipeline_component.name}") - log.info(LOG_DIVIDER) - log.info("\n") - - -def parse_steps(steps: str) -> set[str]: - return set(steps.split(",")) - - -def generate( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - verbose: bool = False, -) -> Pipeline: - kpops_config = KpopsConfig.create( - config, - dotenv, - environment, - verbose, - ) - pipeline = create_pipeline(pipeline_path, kpops_config, environment) - log.info(f"Picked up pipeline '{pipeline_path.parent.name}'") - if steps: - component_names = parse_steps(steps) - log.debug( - f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" - ) - - predicate = filter_type.create_default_step_names_filter_predicate( - component_names - ) - pipeline.filter(predicate) - log.info(f"Filtered pipeline:\n{pipeline.step_names}") - return pipeline - - -def manifest( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - verbose: bool = False, -) -> list[Resource]: - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - resources: list[Resource] = [] - for component in pipeline.components: - resource = component.manifest() - resources.append(resource) - return resources - - -def deploy( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - dry_run: bool = True, - verbose: bool = True, - parallel: bool = False, -): - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def deploy_runner(component: PipelineComponent): - log_action("Deploy", component) - await component.deploy(dry_run) - - async def async_deploy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(deploy_runner) - await pipeline_tasks - else: - for component in pipeline.components: - await deploy_runner(component) - - asyncio.run(async_deploy()) - - -def destroy( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - dry_run: bool = True, - verbose: bool = True, - parallel: bool = False, -): - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def destroy_runner(component: PipelineComponent): - log_action("Destroy", component) - await component.destroy(dry_run) - - async def async_destroy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph( - destroy_runner, reverse=True - ) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await destroy_runner(component) - - asyncio.run(async_destroy()) - - -def reset( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - dry_run: bool = True, - verbose: bool = True, - parallel: bool = False, -): - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def reset_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Reset", component) - await component.reset(dry_run) - - async def async_reset(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await reset_runner(component) - - asyncio.run(async_reset()) - - -def clean( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - dry_run: bool = True, - verbose: bool = True, - parallel: bool = False, -): - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def clean_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Clean", component) - await component.clean(dry_run) - - async def async_clean(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await clean_runner(component) - - asyncio.run(async_clean()) - - -def init( - path: Path, - config_include_opt: bool = False, -): - if not path.exists(): - path.mkdir(parents=False) - elif next(path.iterdir(), False): - log.warning("Please provide a path to an empty directory.") - return - init_project(path, config_include_opt) - - -def create_pipeline( - pipeline_path: Path, - kpops_config: KpopsConfig, - environment: str | None, -) -> Pipeline: - registry = Registry() - if kpops_config.components_module: - registry.find_components(kpops_config.components_module) - registry.find_components("kpops.components") - - handlers = setup_handlers(kpops_config) - parser = PipelineGenerator(kpops_config, registry, handlers) - return parser.load_yaml(pipeline_path, environment) - - -def setup_handlers(config: KpopsConfig) -> ComponentHandlers: - schema_handler = SchemaHandler.load_schema_handler(config) - connector_handler = KafkaConnectHandler.from_kpops_config(config) - proxy_wrapper = ProxyWrapper(config.kafka_rest) - topic_handler = TopicHandler(proxy_wrapper) - - return ComponentHandlers(schema_handler, connector_handler, topic_handler) diff --git a/kpops/api/custom_formatter.py b/kpops/api/logs.py similarity index 55% rename from kpops/api/custom_formatter.py rename to kpops/api/logs.py index 69fc1c73d..e9a833aba 100644 --- a/kpops/api/custom_formatter.py +++ b/kpops/api/logs.py @@ -1,7 +1,13 @@ +from __future__ import annotations + import logging +from typing import TYPE_CHECKING import typer +if TYPE_CHECKING: + from kpops.components import PipelineComponent + class CustomFormatter(logging.Formatter): def format(self, record): @@ -23,3 +29,21 @@ def format(self, record): log_fmt = formats.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) + + +logger = logging.getLogger() +logging.getLogger("httpx").setLevel(logging.WARNING) +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(CustomFormatter()) +logger.addHandler(stream_handler) + +log = logging.getLogger("") +LOG_DIVIDER = "#" * 100 + + +def log_action(action: str, pipeline_component: PipelineComponent): + log.info("\n") + log.info(LOG_DIVIDER) + log.info(f"{action} {pipeline_component.name}") + log.info(LOG_DIVIDER) + log.info("\n") diff --git a/tests/cli/test_handlers.py b/tests/cli/test_handlers.py index be1b63474..18d35131b 100644 --- a/tests/cli/test_handlers.py +++ b/tests/cli/test_handlers.py @@ -1,6 +1,6 @@ from pytest_mock import MockerFixture -from kpops.api.api import setup_handlers +from kpops.api import setup_handlers from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, @@ -10,7 +10,7 @@ from kpops.config import KpopsConfig, SchemaRegistryConfig from tests.cli.resources.custom_module import CustomSchemaProvider -HANDLER_MODULE = "kpops.api.api" +HANDLER_MODULE = "kpops.api" MODULE = CustomSchemaProvider.__module__ diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 0f64a144e..41d5fb54a 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -23,7 +23,7 @@ class TestGenerate: @pytest.fixture(autouse=True) def log_info(self, mocker: MockerFixture) -> MagicMock: - return mocker.patch("kpops.api.api.log.info") + return mocker.patch("kpops.api.log.info") def test_python_api(self): pipeline = kpops.generate( From edf0359b92b47ef259a0d6a2db840077c49e37ef Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 13:48:01 +0200 Subject: [PATCH 07/10] remove dtyper --- kpops/cli/main.py | 3 +-- poetry.lock | 16 +--------------- pyproject.toml | 1 - 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 4bc324d68..b5a151621 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -3,7 +3,6 @@ from pathlib import Path from typing import Optional -import dtyper import typer import kpops @@ -18,7 +17,7 @@ ) from kpops.utils.yaml import print_yaml -app = dtyper.Typer(pretty_exceptions_enable=False) +app = typer.Typer(pretty_exceptions_enable=False) DOTENV_PATH_OPTION: Optional[list[Path]] = typer.Option( default=None, diff --git a/poetry.lock b/poetry.lock index 1089dfff0..e6f270466 100644 --- a/poetry.lock +++ b/poetry.lock @@ -358,20 +358,6 @@ files = [ {file = "distlib-0.3.6.tar.gz", hash = "sha256:14bad2d9b04d3a36127ac97f30b12a19268f211063d8f8ee4f47108896e11b46"}, ] -[[package]] -name = "dtyper" -version = "2.1.0" -description = "🗝 Make `typer` commands callable, or dataclasses 🗝" -optional = false -python-versions = ">=3.7" -files = [ - {file = "dtyper-2.1.0-py3-none-any.whl", hash = "sha256:331f513b33ccd43c1a803a2a06cdb879ed3925381aed9c04bd65470d58107ac6"}, - {file = "dtyper-2.1.0.tar.gz", hash = "sha256:c18a7198c4d9f9194f862307dc326f2594acaffb8e2a6f81335ebc8bf6ed9e40"}, -] - -[package.dependencies] -typer = "*" - [[package]] name = "exceptiongroup" version = "1.0.4" @@ -2303,4 +2289,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = ">=3.10, <3.13" -content-hash = "dc29f4a291f406a5d651b0a82645fd8457387a4a821d71a4da1a7a3bffa18d7e" +content-hash = "80ac595bb43560de5e123076830390054b2c9c15b865f3665e1e1b114a31faad" diff --git a/pyproject.toml b/pyproject.toml index f79b03727..96b3852d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,6 @@ pydantic-settings = "^2.0.3" rich = "^12.4.4" PyYAML = "^6.0" typer = { extras = ["all"], version = "^0.6.1" } -dtyper = "^2.1.0" pyhumps = "^3.7.3" cachetools = "^5.2.0" dictdiffer = "^0.9.0" From b405b610efc07e91016677b4d87d41a7eac5b541 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 14:21:39 +0200 Subject: [PATCH 08/10] address reviews --- kpops/api/__init__.py | 18 +++++++---------- kpops/cli/main.py | 46 ++++++++++++++++++++++--------------------- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py index 481028e1a..826307308 100644 --- a/kpops/api/__init__.py +++ b/kpops/api/__init__.py @@ -28,15 +28,11 @@ from kpops.config import KpopsConfig -def parse_steps(steps: str) -> set[str]: - return set(steps.split(",")) - - def generate( pipeline_path: Path, dotenv: list[Path] | None = None, config: Path = Path(), - steps: str | None = None, + steps: set[str] | None = None, filter_type: FilterType = FilterType.INCLUDE, environment: str | None = None, verbose: bool = False, @@ -50,7 +46,7 @@ def generate( pipeline = create_pipeline(pipeline_path, kpops_config, environment) log.info(f"Picked up pipeline '{pipeline_path.parent.name}'") if steps: - component_names = parse_steps(steps) + component_names = steps log.debug( f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" ) @@ -67,7 +63,7 @@ def manifest( pipeline_path: Path, dotenv: list[Path] | None = None, config: Path = Path(), - steps: str | None = None, + steps: set[str] | None = None, filter_type: FilterType = FilterType.INCLUDE, environment: str | None = None, verbose: bool = False, @@ -92,7 +88,7 @@ def deploy( pipeline_path: Path, dotenv: list[Path] | None = None, config: Path = Path(), - steps: str | None = None, + steps: set[str] | None = None, filter_type: FilterType = FilterType.INCLUDE, environment: str | None = None, dry_run: bool = True, @@ -128,7 +124,7 @@ def destroy( pipeline_path: Path, dotenv: list[Path] | None = None, config: Path = Path(), - steps: str | None = None, + steps: set[str] | None = None, filter_type: FilterType = FilterType.INCLUDE, environment: str | None = None, dry_run: bool = True, @@ -166,7 +162,7 @@ def reset( pipeline_path: Path, dotenv: list[Path] | None = None, config: Path = Path(), - steps: str | None = None, + steps: set[str] | None = None, filter_type: FilterType = FilterType.INCLUDE, environment: str | None = None, dry_run: bool = True, @@ -203,7 +199,7 @@ def clean( pipeline_path: Path, dotenv: list[Path] | None = None, config: Path = Path(), - steps: str | None = None, + steps: set[str] | None = None, filter_type: FilterType = FilterType.INCLUDE, environment: str | None = None, dry_run: bool = True, diff --git a/kpops/cli/main.py b/kpops/cli/main.py index b5a151621..3a3c17471 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -105,9 +105,11 @@ ) -@app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 - help="Initialize a new KPOps project." -) +def parse_steps(steps: str | None) -> set[str] | None: + return set(steps.split(",")) if steps else None + + +@app.command(help="Initialize a new KPOps project.") def init( path: Path = PROJECT_PATH, config_include_opt: bool = CONFIG_INCLUDE_OPTIONAL, @@ -115,7 +117,7 @@ def init( kpops.init(path, config_include_opt=config_include_opt) -@app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command( help=""" Generate JSON schema. @@ -153,7 +155,7 @@ def schema( gen_config_schema() -@app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command( short_help="Generate enriched pipeline representation", help="Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps operations (deploy, destroy, ...).", ) @@ -167,18 +169,18 @@ def generate( verbose: bool = VERBOSE_OPTION, ): pipeline = kpops.generate( - pipeline_path, - dotenv, - config, - steps, - filter_type, - environment, - verbose, + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + verbose=verbose, ) print_yaml(pipeline.to_yaml()) -@app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command( short_help="Render final resource representation", help="In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests.", ) @@ -195,7 +197,7 @@ def manifest( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - steps=steps, + steps=parse_steps(steps), filter_type=filter_type, environment=environment, verbose=verbose, @@ -205,7 +207,7 @@ def manifest( print_yaml(rendered_manifest) -@app.command(help="Deploy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command(help="Deploy pipeline steps") def deploy( pipeline_path: Path = PIPELINE_PATH_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, @@ -221,7 +223,7 @@ def deploy( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - steps=steps, + steps=parse_steps(steps), filter_type=filter_type, environment=environment, dry_run=dry_run, @@ -230,7 +232,7 @@ def deploy( ) -@app.command(help="Destroy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command(help="Destroy pipeline steps") def destroy( pipeline_path: Path = PIPELINE_PATH_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, @@ -246,7 +248,7 @@ def destroy( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - steps=steps, + steps=parse_steps(steps), filter_type=filter_type, environment=environment, dry_run=dry_run, @@ -255,7 +257,7 @@ def destroy( ) -@app.command(help="Reset pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command(help="Reset pipeline steps") def reset( pipeline_path: Path = PIPELINE_PATH_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, @@ -271,7 +273,7 @@ def reset( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - steps=steps, + steps=parse_steps(steps), filter_type=filter_type, environment=environment, dry_run=dry_run, @@ -280,7 +282,7 @@ def reset( ) -@app.command(help="Clean pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 +@app.command(help="Clean pipeline steps") def clean( pipeline_path: Path = PIPELINE_PATH_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, @@ -296,7 +298,7 @@ def clean( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - steps=steps, + steps=parse_steps(steps), filter_type=filter_type, environment=environment, dry_run=dry_run, From 17894022e520e655bfca2f52704fa042d6026125 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 14:26:22 +0200 Subject: [PATCH 09/10] move tests --- tests/api/__init__.py | 0 tests/{cli => api}/test_handlers.py | 0 tests/{cli => api}/test_registry.py | 0 tests/{cli => }/test_kpops_config.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/api/__init__.py rename tests/{cli => api}/test_handlers.py (100%) rename tests/{cli => api}/test_registry.py (100%) rename tests/{cli => }/test_kpops_config.py (100%) diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/cli/test_handlers.py b/tests/api/test_handlers.py similarity index 100% rename from tests/cli/test_handlers.py rename to tests/api/test_handlers.py diff --git a/tests/cli/test_registry.py b/tests/api/test_registry.py similarity index 100% rename from tests/cli/test_registry.py rename to tests/api/test_registry.py diff --git a/tests/cli/test_kpops_config.py b/tests/test_kpops_config.py similarity index 100% rename from tests/cli/test_kpops_config.py rename to tests/test_kpops_config.py From fceda7ab67aaccfe5ad1a673fc38ed95aa0c83f5 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 15:52:14 +0200 Subject: [PATCH 10/10] fix pyright erros --- tests/pipeline/test_generate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 41d5fb54a..0aa50fe3e 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -39,7 +39,7 @@ def test_python_api(self): def test_python_api_filter_include(self, log_info: MagicMock): pipeline = kpops.generate( RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - steps="converter", + steps={"converter"}, filter_type=FilterType.INCLUDE, ) assert len(pipeline) == 1 @@ -51,7 +51,7 @@ def test_python_api_filter_include(self, log_info: MagicMock): def test_python_api_filter_exclude(self, log_info: MagicMock): pipeline = kpops.generate( RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - steps="converter,scheduled-producer", + steps={"converter", "scheduled-producer"}, filter_type=FilterType.EXCLUDE, ) assert len(pipeline) == 1