From 7404a158a1966d8501a78552f724dba10829e739 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 10 May 2024 09:21:48 +0200 Subject: [PATCH 01/21] Deploy multiple pipelines --- examples | 2 +- kpops/cli/main.py | 60 +++-- kpops/pipeline.py | 4 + .../pipeline-folder/pipeline-1/pipeline.yaml | 5 + .../pipeline-folder/pipeline-2/pipeline.yaml | 9 + .../pipeline-folder/pipeline-3/pipeline.yaml | 12 + .../pipeline.yaml | 244 ++++++++++++++++++ tests/pipeline/test_generate.py | 14 + 8 files changed, 330 insertions(+), 20 deletions(-) create mode 100644 tests/pipeline/resources/pipeline-folder/pipeline-1/pipeline.yaml create mode 100644 tests/pipeline/resources/pipeline-folder/pipeline-2/pipeline.yaml create mode 100644 tests/pipeline/resources/pipeline-folder/pipeline-3/pipeline.yaml create mode 100644 tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml diff --git a/examples b/examples index 95fe43b15..f7613426d 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit 95fe43b1504836d5d36688f75dfbea6598d8281a +Subproject commit f7613426dffe5a1d6332c9e3cc7f0bfb23396e68 diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 6bdcd8dc4..94f5345ee 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -66,7 +66,7 @@ default=..., exists=True, file_okay=True, - dir_okay=False, + dir_okay=True, readable=True, envvar=f"{ENV_PREFIX}PIPELINE_PATH", help="Path to YAML with pipeline definition", @@ -259,6 +259,15 @@ def schema( gen_config_schema() +def collect_pipeline_paths(pipeline_path: Path) -> list[Path]: + paths = [] + if pipeline_path.is_dir(): + pipeline_file_paths_iter = pipeline_path.glob("**/pipeline*.yaml") + for pipeline_file_path in pipeline_file_paths_iter: + paths.append(pipeline_file_path) + return paths + + @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 short_help="Generate enriched pipeline representation", help="Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps operations (deploy, destroy, ...).", @@ -279,27 +288,40 @@ def generate( environment, verbose, ) + if pipeline_path.is_dir(): + pipeline_file_paths = collect_pipeline_paths(pipeline_path) + else: + pipeline_file_paths = [pipeline_path] + list_pipeline = [] + for pipeline_file_path in pipeline_file_paths: + pipeline = setup_pipeline(pipeline_file_path, kpops_config, environment) + + if steps: + component_names = parse_steps(steps) + log.debug( + f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" + ) - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - - if steps: - component_names = parse_steps(steps) - log.debug( - f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" - ) - - predicate = create_default_step_names_filter_predicate( - component_names, filter_type - ) - pipeline.filter(predicate) + predicate = create_default_step_names_filter_predicate( + component_names, filter_type + ) + pipeline.filter(predicate) - def get_step_names(steps_to_apply: list[PipelineComponent]) -> list[str]: - return [step.name for step in steps_to_apply] + def get_step_names( + steps_to_apply: list[PipelineComponent], + ) -> list[str]: + return [step.name for step in steps_to_apply] - log.info(f"Filtered pipeline:\n{get_step_names(pipeline.components)}") - if output: - print_yaml(pipeline.to_yaml()) - return pipeline + log.info(f"Filtered pipeline:\n{get_step_names(pipeline.components)}") + if output: + print_yaml(pipeline.to_yaml()) + list_pipeline.append(pipeline) + + # TODO: Check if this logic breaks anything or not... We need to return a single Pipeline object. + base_pipeline = Pipeline() + for pipeline in list_pipeline: + base_pipeline.add_all(pipeline) + return base_pipeline @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 3f4f698b9..7f0c9e841 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -59,6 +59,10 @@ def components(self) -> list[SerializeAsAny[PipelineComponent]]: def last(self) -> PipelineComponent: return self.components[-1] + def add_all(self, pipeline: Pipeline) -> None: + for component in pipeline.components: + self.add(component) + def add(self, component: PipelineComponent) -> None: if self._component_index.get(component.id) is not None: msg = ( diff --git a/tests/pipeline/resources/pipeline-folder/pipeline-1/pipeline.yaml b/tests/pipeline/resources/pipeline-folder/pipeline-1/pipeline.yaml new file mode 100644 index 000000000..503665e9a --- /dev/null +++ b/tests/pipeline/resources/pipeline-folder/pipeline-1/pipeline.yaml @@ -0,0 +1,5 @@ +- type: scheduled-producer + app: + commandLine: + FAKE_ARG: "fake-arg-value" + schedule: "30 3/8 * * *" diff --git a/tests/pipeline/resources/pipeline-folder/pipeline-2/pipeline.yaml b/tests/pipeline/resources/pipeline-folder/pipeline-2/pipeline.yaml new file mode 100644 index 000000000..0dfc1da57 --- /dev/null +++ b/tests/pipeline/resources/pipeline-folder/pipeline-2/pipeline.yaml @@ -0,0 +1,9 @@ +- type: converter + app: + commandLine: + CONVERT_XML: true + resources: + limits: + memory: 2G + requests: + memory: 2G diff --git a/tests/pipeline/resources/pipeline-folder/pipeline-3/pipeline.yaml b/tests/pipeline/resources/pipeline-folder/pipeline-3/pipeline.yaml new file mode 100644 index 000000000..99571e5e6 --- /dev/null +++ b/tests/pipeline/resources/pipeline-folder/pipeline-3/pipeline.yaml @@ -0,0 +1,12 @@ +- type: filter + name: "a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name" + app: + commandLine: + TYPE: "nothing" + resources: + requests: + memory: 3G + replicaCount: 4 + autoscaling: + minReplicas: 4 + maxReplicas: 4 diff --git a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml new file mode 100644 index 000000000..b77eb687b --- /dev/null +++ b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml @@ -0,0 +1,244 @@ +- _cleaner: + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folder-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folder-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + ? resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + : configs: + retention.ms: '-1' + partitions_count: 50 + type: output + ? resources-pipeline-folder-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + : configs: + cleanup.policy: compact,delete + partitions_count: 1 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: filter + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folder-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folder-pipeline-2-converter-error + outputTopic: resources-pipeline-folder-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + name: converter + namespace: example-namespace + prefix: resources-pipeline-folder-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folder-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folder-pipeline-2-converter-error + outputTopic: resources-pipeline-folder-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + name: converter + namespace: example-namespace + prefix: resources-pipeline-folder-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + resources-pipeline-folder-pipeline-2-converter: + configs: + cleanup.policy: compact,delete + retention.ms: '-1' + partitions_count: 50 + type: output + resources-pipeline-folder-pipeline-2-converter-error: + configs: + cleanup.policy: compact,delete + partitions_count: 10 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: converter + version: 2.4.2 + +- _cleaner: + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folder-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folder-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: producer-app-cleaner + version: 2.4.2 + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folder-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folder-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: + com/bakdata/kafka/fake: 1.0.0 + topics: + resources-pipeline-folder-pipeline-1-scheduled-producer: + configs: + cleanup.policy: compact,delete + partitions_count: 12 + type: output + value_schema: com.bakdata.fake.Produced + type: scheduled-producer + version: 2.4.2 + diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 8b237f705..db2625c35 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -77,6 +77,20 @@ def test_load_pipeline(self, snapshot: Snapshot): snapshot.assert_match(result.stdout, "pipeline.yaml") + def test_load_pipeline_with_folder_path(self, snapshot: Snapshot): + result = runner.invoke( + app, + [ + "generate", + str(RESOURCE_PATH / "pipeline-folder"), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.stdout + + snapshot.assert_match(result.stdout, "pipeline.yaml") + def test_name_equal_prefix_name_concatenation(self): result = runner.invoke( app, From a2f78a1afd1792c0a3ccb67ed71d8180f1aecbb4 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 10 May 2024 10:00:57 +0200 Subject: [PATCH 02/21] refactor utils --- kpops/cli/main.py | 22 +++++----------------- kpops/cli/utils.py | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 17 deletions(-) create mode 100644 kpops/cli/utils.py diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 94f5345ee..1ce06e9af 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -12,6 +12,7 @@ from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType from kpops.cli.registry import Registry +from kpops.cli.utils import collect_pipeline_paths from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, @@ -259,15 +260,6 @@ def schema( gen_config_schema() -def collect_pipeline_paths(pipeline_path: Path) -> list[Path]: - paths = [] - if pipeline_path.is_dir(): - pipeline_file_paths_iter = pipeline_path.glob("**/pipeline*.yaml") - for pipeline_file_path in pipeline_file_paths_iter: - paths.append(pipeline_file_path) - return paths - - @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 short_help="Generate enriched pipeline representation", help="Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps operations (deploy, destroy, ...).", @@ -288,12 +280,8 @@ def generate( environment, verbose, ) - if pipeline_path.is_dir(): - pipeline_file_paths = collect_pipeline_paths(pipeline_path) - else: - pipeline_file_paths = [pipeline_path] list_pipeline = [] - for pipeline_file_path in pipeline_file_paths: + for pipeline_file_path in collect_pipeline_paths(pipeline_path): pipeline = setup_pipeline(pipeline_file_path, kpops_config, environment) if steps: @@ -318,10 +306,10 @@ def get_step_names( list_pipeline.append(pipeline) # TODO: Check if this logic breaks anything or not... We need to return a single Pipeline object. - base_pipeline = Pipeline() + super_pipeline = Pipeline() for pipeline in list_pipeline: - base_pipeline.add_all(pipeline) - return base_pipeline + super_pipeline.add_all(pipeline) + return super_pipeline @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 diff --git a/kpops/cli/utils.py b/kpops/cli/utils.py new file mode 100644 index 000000000..c5f20432e --- /dev/null +++ b/kpops/cli/utils.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from collections.abc import Generator +from pathlib import Path + + +def collect_pipeline_paths(pipeline_path: Path) -> Generator[Path, None, None]: + """Generate paths to pipeline files. + + :param pipeline_path: The path to the pipeline file or directory. + + :yields: Path: Paths to pipeline files. If `pipeline_path` file yields the given path. + For a directory it yields all the pipeline.yaml paths. + + :raises: RuntimeError: If `pipeline_path` is neither a file nor a directory. + """ + if pipeline_path.is_file(): + yield pipeline_path + elif pipeline_path.is_dir(): + yield from pipeline_path.glob("**/pipeline*.yaml") + msg = "Pipeline path is not a file or directory." + raise RuntimeError(msg) From e3c0d754d1961bbd17230123851563289e152f91 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 10 May 2024 10:01:15 +0200 Subject: [PATCH 03/21] Update files --- kpops/cli/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kpops/cli/utils.py b/kpops/cli/utils.py index c5f20432e..3fbef4744 100644 --- a/kpops/cli/utils.py +++ b/kpops/cli/utils.py @@ -18,5 +18,6 @@ def collect_pipeline_paths(pipeline_path: Path) -> Generator[Path, None, None]: yield pipeline_path elif pipeline_path.is_dir(): yield from pipeline_path.glob("**/pipeline*.yaml") + # TODO: Can this ever happen? msg = "Pipeline path is not a file or directory." raise RuntimeError(msg) From b118ec40e9428c118598ed61cac025f00742d373 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 10 May 2024 10:09:49 +0200 Subject: [PATCH 04/21] refactor utils --- kpops/cli/main.py | 14 ++++++-------- kpops/cli/utils.py | 7 ++++--- kpops/pipeline.py | 5 +++++ 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 1ce06e9af..3e0186507 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -22,7 +22,11 @@ from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.components.base_components.models.resource import Resource from kpops.config import ENV_PREFIX, KpopsConfig -from kpops.pipeline import ComponentFilterPredicate, Pipeline, PipelineGenerator +from kpops.pipeline import ( + ComponentFilterPredicate, + Pipeline, + PipelineGenerator, +) from kpops.utils.cli_commands import init_project from kpops.utils.gen_schema import ( SchemaScope, @@ -294,13 +298,7 @@ def generate( component_names, filter_type ) pipeline.filter(predicate) - - def get_step_names( - steps_to_apply: list[PipelineComponent], - ) -> list[str]: - return [step.name for step in steps_to_apply] - - log.info(f"Filtered pipeline:\n{get_step_names(pipeline.components)}") + log.info(f"Filtered pipeline:\n{pipeline.step_names}") if output: print_yaml(pipeline.to_yaml()) list_pipeline.append(pipeline) diff --git a/kpops/cli/utils.py b/kpops/cli/utils.py index 3fbef4744..fa658b183 100644 --- a/kpops/cli/utils.py +++ b/kpops/cli/utils.py @@ -18,6 +18,7 @@ def collect_pipeline_paths(pipeline_path: Path) -> Generator[Path, None, None]: yield pipeline_path elif pipeline_path.is_dir(): yield from pipeline_path.glob("**/pipeline*.yaml") - # TODO: Can this ever happen? - msg = "Pipeline path is not a file or directory." - raise RuntimeError(msg) + else: + # TODO: Can this ever happen? + msg = "Pipeline path is not a file or directory." + raise RuntimeError(msg) diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 7f0c9e841..85bfa3cb3 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -63,6 +63,11 @@ def add_all(self, pipeline: Pipeline) -> None: for component in pipeline.components: self.add(component) + @computed_field(title="Step Names") + @property + def step_names(self) -> list[str]: + return [step.name for step in self.components] + def add(self, component: PipelineComponent) -> None: if self._component_index.get(component.id) is not None: msg = ( From b12566669a29f70cd073fd63999022b31ac08de7 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 10 May 2024 10:18:10 +0200 Subject: [PATCH 05/21] Improve printing --- kpops/cli/main.py | 12 ++++-------- kpops/pipeline.py | 4 ++++ .../pipeline.yaml | 2 -- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 3e0186507..5de0334ea 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -284,7 +284,7 @@ def generate( environment, verbose, ) - list_pipeline = [] + super_pipeline = Pipeline() for pipeline_file_path in collect_pipeline_paths(pipeline_path): pipeline = setup_pipeline(pipeline_file_path, kpops_config, environment) @@ -299,14 +299,10 @@ def generate( ) pipeline.filter(predicate) log.info(f"Filtered pipeline:\n{pipeline.step_names}") - if output: - print_yaml(pipeline.to_yaml()) - list_pipeline.append(pipeline) + super_pipeline.append(pipeline) - # TODO: Check if this logic breaks anything or not... We need to return a single Pipeline object. - super_pipeline = Pipeline() - for pipeline in list_pipeline: - super_pipeline.add_all(pipeline) + if output: + print_yaml(super_pipeline.to_yaml()) return super_pipeline diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 85bfa3cb3..3e429d4c7 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -59,6 +59,10 @@ def components(self) -> list[SerializeAsAny[PipelineComponent]]: def last(self) -> PipelineComponent: return self.components[-1] + def append(self, pipeline: Pipeline) -> None: + for component in pipeline.components: + self.add(component) + def add_all(self, pipeline: Pipeline) -> None: for component in pipeline.components: self.add(component) diff --git a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml index b77eb687b..d27bc3ac3 100644 --- a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml @@ -94,7 +94,6 @@ value_schema: com.bakdata.kafka.DeadLetter type: filter version: 2.4.2 - - _cleaner: app: autoscaling: @@ -188,7 +187,6 @@ value_schema: com.bakdata.kafka.DeadLetter type: converter version: 2.4.2 - - _cleaner: app: commandLine: From 53c6d1a1938c1296669d1f9f3ec98e95c97497da Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 10 May 2024 10:45:38 +0200 Subject: [PATCH 06/21] Add log --- kpops/cli/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 5de0334ea..a0bdbe14d 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -287,7 +287,7 @@ def generate( super_pipeline = Pipeline() for pipeline_file_path in collect_pipeline_paths(pipeline_path): pipeline = setup_pipeline(pipeline_file_path, kpops_config, environment) - + log.info(f"Picked up pipeline {pipeline_file_path.parent.name}") if steps: component_names = parse_steps(steps) log.debug( From 772b57828f392d0f24d29536e00f808323a6d44d Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 10 May 2024 14:12:00 +0200 Subject: [PATCH 07/21] Fix tests --- kpops/cli/main.py | 2 +- tests/pipeline/test_generate.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index a0bdbe14d..6f48cbc71 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -287,7 +287,7 @@ def generate( super_pipeline = Pipeline() for pipeline_file_path in collect_pipeline_paths(pipeline_path): pipeline = setup_pipeline(pipeline_file_path, kpops_config, environment) - log.info(f"Picked up pipeline {pipeline_file_path.parent.name}") + log.info(f"Picked up pipeline '{pipeline_file_path.parent.name}'") if steps: component_names = parse_steps(steps) log.debug( diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index db2625c35..f19f3b2c0 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -46,7 +46,8 @@ def test_python_api_filter_include(self, log_info: MagicMock): ) assert len(pipeline) == 1 assert pipeline.components[0].type == "converter" - assert log_info.call_count == 1 + assert log_info.call_count == 2 + log_info.assert_any_call("Picked up pipeline 'first-pipeline'") log_info.assert_any_call("Filtered pipeline:\n['converter']") def test_python_api_filter_exclude(self, log_info: MagicMock): @@ -58,7 +59,8 @@ def test_python_api_filter_exclude(self, log_info: MagicMock): ) assert len(pipeline) == 1 assert pipeline.components[0].type == "filter" - assert log_info.call_count == 1 + assert log_info.call_count == 2 + log_info.assert_any_call("Picked up pipeline 'first-pipeline'") log_info.assert_any_call( "Filtered pipeline:\n['a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name']" ) From 3c04b3f6b9b4a67308b74f44e808e1b4f73159ba Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 13 May 2024 10:17:54 +0200 Subject: [PATCH 08/21] address reviews --- kpops/cli/main.py | 2 +- kpops/cli/utils.py | 7 +++---- kpops/pipeline.py | 6 +----- 3 files changed, 5 insertions(+), 10 deletions(-) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 6f48cbc71..5c0dc1dd7 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -299,7 +299,7 @@ def generate( ) pipeline.filter(predicate) log.info(f"Filtered pipeline:\n{pipeline.step_names}") - super_pipeline.append(pipeline) + super_pipeline.extend(pipeline) if output: print_yaml(super_pipeline.to_yaml()) diff --git a/kpops/cli/utils.py b/kpops/cli/utils.py index fa658b183..986d95e95 100644 --- a/kpops/cli/utils.py +++ b/kpops/cli/utils.py @@ -1,10 +1,10 @@ from __future__ import annotations -from collections.abc import Generator +from collections.abc import Iterator from pathlib import Path -def collect_pipeline_paths(pipeline_path: Path) -> Generator[Path, None, None]: +def collect_pipeline_paths(pipeline_path: Path) -> Iterator[Path]: """Generate paths to pipeline files. :param pipeline_path: The path to the pipeline file or directory. @@ -19,6 +19,5 @@ def collect_pipeline_paths(pipeline_path: Path) -> Generator[Path, None, None]: elif pipeline_path.is_dir(): yield from pipeline_path.glob("**/pipeline*.yaml") else: - # TODO: Can this ever happen? - msg = "Pipeline path is not a file or directory." + msg = f"The entered pipeline path '{pipeline_path}' should be a directory or file." raise RuntimeError(msg) diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 3e429d4c7..3742fa5a0 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -59,11 +59,7 @@ def components(self) -> list[SerializeAsAny[PipelineComponent]]: def last(self) -> PipelineComponent: return self.components[-1] - def append(self, pipeline: Pipeline) -> None: - for component in pipeline.components: - self.add(component) - - def add_all(self, pipeline: Pipeline) -> None: + def extend(self, pipeline: Pipeline) -> None: for component in pipeline.components: self.add(component) From b543c18f26c981db560cc558f0d8bbe1b392ea10 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 13 May 2024 10:25:34 +0200 Subject: [PATCH 09/21] Add teset for graph execution --- .../parallel-pipeline-folders/config.yaml | 15 +++++++ .../parallel-pipeline-folders/defaults.yaml | 27 ++++++++++++ .../producers/pipeline.yaml | 23 ++++++++++ .../sink-connector/pipeline.yaml | 27 ++++++++++++ .../streams-app/pipeline.yaml | 12 ++++++ .../pipeline-1/pipeline.yaml | 0 .../pipeline-2/pipeline.yaml | 0 .../pipeline-3/pipeline.yaml | 0 tests/pipeline/test_generate.py | 43 ++++++++++++++++++- 9 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 tests/pipeline/resources/parallel-pipeline-folders/config.yaml create mode 100644 tests/pipeline/resources/parallel-pipeline-folders/defaults.yaml create mode 100644 tests/pipeline/resources/parallel-pipeline-folders/producers/pipeline.yaml create mode 100644 tests/pipeline/resources/parallel-pipeline-folders/sink-connector/pipeline.yaml create mode 100644 tests/pipeline/resources/parallel-pipeline-folders/streams-app/pipeline.yaml rename tests/pipeline/resources/{pipeline-folder => pipeline-folders}/pipeline-1/pipeline.yaml (100%) rename tests/pipeline/resources/{pipeline-folder => pipeline-folders}/pipeline-2/pipeline.yaml (100%) rename tests/pipeline/resources/{pipeline-folder => pipeline-folders}/pipeline-3/pipeline.yaml (100%) diff --git a/tests/pipeline/resources/parallel-pipeline-folders/config.yaml b/tests/pipeline/resources/parallel-pipeline-folders/config.yaml new file mode 100644 index 000000000..1c3b4443f --- /dev/null +++ b/tests/pipeline/resources/parallel-pipeline-folders/config.yaml @@ -0,0 +1,15 @@ +topic_name_config: + default_error_topic_name: ${component.name}-dead-letter-topic + default_output_topic_name: ${component.name}-test-topic + +schema_registry: + enabled: true + url: "http://localhost:8081" + +kafka_connect: + url: "http://kafka_connect_url:8083" +kafka_rest: + url: "http://kafka_rest_url:8082" + +defaults_path: .. +kafka_brokers: "broker:9092" diff --git a/tests/pipeline/resources/parallel-pipeline-folders/defaults.yaml b/tests/pipeline/resources/parallel-pipeline-folders/defaults.yaml new file mode 100644 index 000000000..4b6cd0c91 --- /dev/null +++ b/tests/pipeline/resources/parallel-pipeline-folders/defaults.yaml @@ -0,0 +1,27 @@ +pipeline-component: + prefix: "" + +kubernetes-app: + namespace: ${NAMESPACE} + +kafka-connector: + namespace: ${NAMESPACE} + +kafka-app: + app: + streams: + brokers: ${config.kafka_brokers} + schemaRegistryUrl: ${config.schema_registry.url} + +streams-app: + app: + labels: + pipeline: ${pipeline.name} + to: + topics: + ${error_topic_name}: + type: error + partitions_count: 1 + ${output_topic_name}: + type: output + partitions_count: 3 diff --git a/tests/pipeline/resources/parallel-pipeline-folders/producers/pipeline.yaml b/tests/pipeline/resources/parallel-pipeline-folders/producers/pipeline.yaml new file mode 100644 index 000000000..b3fa5f625 --- /dev/null +++ b/tests/pipeline/resources/parallel-pipeline-folders/producers/pipeline.yaml @@ -0,0 +1,23 @@ +- type: producer-app + name: transaction-avro-producer-1 + to: + topics: + my-output-topic-with-multiple-producers: + type: output + partitions_count: 3 + +- type: producer-app + name: transaction-avro-producer-2 + to: + topics: + my-output-topic-with-multiple-producers: + type: output + partitions_count: 3 + +- type: producer-app + name: transaction-avro-producer-3 + to: + topics: + my-output-topic-with-multiple-producers: + type: output + partitions_count: 3 diff --git a/tests/pipeline/resources/parallel-pipeline-folders/sink-connector/pipeline.yaml b/tests/pipeline/resources/parallel-pipeline-folders/sink-connector/pipeline.yaml new file mode 100644 index 000000000..2fa731960 --- /dev/null +++ b/tests/pipeline/resources/parallel-pipeline-folders/sink-connector/pipeline.yaml @@ -0,0 +1,27 @@ +- type: kafka-sink-connector + name: s3-connector-1 + from: + topics: + account-linker-test-topic: + type: input + app: + connector.class: io.confluent.connect.s3.S3SinkConnector + + +- type: kafka-sink-connector + name: s3-connector-2 + from: + topics: + account-linker-test-topic: + type: input + app: + connector.class: io.confluent.connect.s3.S3SinkConnector + +- type: kafka-sink-connector + name: s3-connector-3 + from: + topics: + account-linker-test-topic: + type: input + app: + connector.class: io.confluent.connect.s3.S3SinkConnector diff --git a/tests/pipeline/resources/parallel-pipeline-folders/streams-app/pipeline.yaml b/tests/pipeline/resources/parallel-pipeline-folders/streams-app/pipeline.yaml new file mode 100644 index 000000000..a102d4e5b --- /dev/null +++ b/tests/pipeline/resources/parallel-pipeline-folders/streams-app/pipeline.yaml @@ -0,0 +1,12 @@ +- type: streams-app + name: transaction-joiner + +- type: streams-app + name: fraud-detector + +- type: streams-app + name: account-linker + from: + components: + fraud-detector: + type: input diff --git a/tests/pipeline/resources/pipeline-folder/pipeline-1/pipeline.yaml b/tests/pipeline/resources/pipeline-folders/pipeline-1/pipeline.yaml similarity index 100% rename from tests/pipeline/resources/pipeline-folder/pipeline-1/pipeline.yaml rename to tests/pipeline/resources/pipeline-folders/pipeline-1/pipeline.yaml diff --git a/tests/pipeline/resources/pipeline-folder/pipeline-2/pipeline.yaml b/tests/pipeline/resources/pipeline-folders/pipeline-2/pipeline.yaml similarity index 100% rename from tests/pipeline/resources/pipeline-folder/pipeline-2/pipeline.yaml rename to tests/pipeline/resources/pipeline-folders/pipeline-2/pipeline.yaml diff --git a/tests/pipeline/resources/pipeline-folder/pipeline-3/pipeline.yaml b/tests/pipeline/resources/pipeline-folders/pipeline-3/pipeline.yaml similarity index 100% rename from tests/pipeline/resources/pipeline-folder/pipeline-3/pipeline.yaml rename to tests/pipeline/resources/pipeline-folders/pipeline-3/pipeline.yaml diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index f19f3b2c0..a4e0de272 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -84,7 +84,7 @@ def test_load_pipeline_with_folder_path(self, snapshot: Snapshot): app, [ "generate", - str(RESOURCE_PATH / "pipeline-folder"), + str(RESOURCE_PATH / "pipeline-folders"), ], catch_exceptions=False, ) @@ -719,6 +719,47 @@ async def name_runner(component: PipelineComponent): mock.call("s3-connector-1"), ] + @pytest.mark.asyncio() + async def test_pipeline_folders_parallel_execution_graph(self): + pipeline = kpops.generate( + RESOURCE_PATH / "parallel-pipeline-folders", + config=RESOURCE_PATH / "parallel-pipeline", + ) + + called_component = AsyncMock() + + sleep_table_components = { + "transaction-avro-producer-1": 1, + "transaction-avro-producer-2": 0, + "transaction-avro-producer-3": 2, + "transaction-joiner": 3, + "fraud-detector": 2, + "account-linker": 0, + "s3-connector-1": 2, + "s3-connector-2": 1, + "s3-connector-3": 0, + } + + async def name_runner(component: PipelineComponent): + await asyncio.sleep(sleep_table_components[component.name]) + await called_component(component.name) + + execution_graph = pipeline.build_execution_graph(name_runner) + + await execution_graph + + assert called_component.mock_calls == [ + mock.call("transaction-avro-producer-2"), + mock.call("transaction-avro-producer-1"), + mock.call("transaction-avro-producer-3"), + mock.call("transaction-joiner"), + mock.call("fraud-detector"), + mock.call("account-linker"), + mock.call("s3-connector-3"), + mock.call("s3-connector-2"), + mock.call("s3-connector-1"), + ] + @pytest.mark.asyncio() async def test_subgraph_execution(self): pipeline = kpops.generate( From 06bdaf9f7ba6759d10331ca2028b1138cb2bb944 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 13 May 2024 10:37:21 +0200 Subject: [PATCH 10/21] Add return --- kpops/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 3742fa5a0..84d712c94 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -81,7 +81,7 @@ def remove(self, component_id: str) -> None: self._component_index.pop(component_id) def get(self, component_id: str) -> PipelineComponent | None: - self._component_index.get(component_id) + return self._component_index.get(component_id) def find(self, predicate: ComponentFilterPredicate) -> Iterator[PipelineComponent]: """Find pipeline components matching a custom predicate. From 8c72d7e9f103ecc1c5312769b23ad97719abd9cd Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 21 May 2024 14:57:19 +0200 Subject: [PATCH 11/21] Separate KPOps API from the CLI --- docs/docs/user/references/cli-commands.md | 1 - kpops/__init__.py | 3 +- kpops/api.py | 47 +++++++ kpops/cli/main.py | 121 +++--------------- kpops/cli/options.py | 23 ++++ .../streams_bootstrap/streams/model.py | 2 +- kpops/config.py | 20 +++ kpops/exception.py | 9 ++ kpops/pipeline.py | 50 ++++++-- tests/cli/test_handlers.py | 20 +-- tests/components/test_streams_app.py | 2 +- tests/pipeline/test_generate.py | 13 +- tests/pipeline/test_pipeline.py | 17 +-- 13 files changed, 179 insertions(+), 149 deletions(-) create mode 100644 kpops/api.py create mode 100644 kpops/exception.py diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 3c46cb11b..7225c89ad 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -120,7 +120,6 @@ $ kpops generate [OPTIONS] PIPELINE_PATH * `--dotenv FILE`: Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. [env var: KPOPS_DOTENV_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] -* `--output / --no-output`: Enable output printing [default: output] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] * `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] diff --git a/kpops/__init__.py b/kpops/__init__.py index 34ff74a89..474b65c61 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,7 +1,8 @@ __version__ = "5.0.1" # export public API functions -from kpops.cli.main import clean, deploy, destroy, generate, init, manifest, reset +from kpops.api import generate +from kpops.cli.main import clean, deploy, destroy, init, manifest, reset __all__ = ( "generate", diff --git a/kpops/api.py b/kpops/api.py new file mode 100644 index 000000000..7c8190699 --- /dev/null +++ b/kpops/api.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +import logging +from pathlib import Path + +from kpops.cli.options import FilterType +from kpops.config import KpopsConfig +from kpops.pipeline import ( + Pipeline, +) + +log = logging.getLogger("KPOpsAPI") + + +def parse_steps(steps: str) -> set[str]: + return set(steps.split(",")) + + +def generate( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + verbose: bool = False, +) -> Pipeline: + kpops_config = KpopsConfig.create( + config, + dotenv, + environment, + verbose, + ) + pipeline = Pipeline.create(pipeline_path, kpops_config, environment) + log.info(f"Picked up pipeline '{pipeline_path.parent.name}'") + if steps: + component_names = parse_steps(steps) + log.debug( + f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" + ) + + predicate = filter_type.create_default_step_names_filter_predicate( + component_names + ) + pipeline.filter(predicate) + log.info(f"Filtered pipeline:\n{pipeline.step_names}") + return pipeline diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 6bdcd8dc4..911071bcd 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -8,20 +8,12 @@ import dtyper import typer +import kpops from kpops import __version__ from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType -from kpops.cli.registry import Registry -from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( - KafkaConnectHandler, -) -from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler -from kpops.component_handlers.topic.handler import TopicHandler -from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.components.base_components.models.resource import Resource from kpops.config import ENV_PREFIX, KpopsConfig -from kpops.pipeline import ComponentFilterPredicate, Pipeline, PipelineGenerator from kpops.utils.cli_commands import init_project from kpops.utils.gen_schema import ( SchemaScope, @@ -29,7 +21,6 @@ gen_defaults_schema, gen_pipeline_schema, ) -from kpops.utils.pydantic import YamlConfigSettingsSource from kpops.utils.yaml import print_yaml if TYPE_CHECKING: @@ -135,55 +126,10 @@ log = logging.getLogger("") -def setup_pipeline( - pipeline_path: Path, - kpops_config: KpopsConfig, - environment: str | None, -) -> Pipeline: - registry = Registry() - if kpops_config.components_module: - registry.find_components(kpops_config.components_module) - registry.find_components("kpops.components") - - handlers = setup_handlers(kpops_config) - parser = PipelineGenerator(kpops_config, registry, handlers) - return parser.load_yaml(pipeline_path, environment) - - -def setup_handlers(config: KpopsConfig) -> ComponentHandlers: - schema_handler = SchemaHandler.load_schema_handler(config) - connector_handler = KafkaConnectHandler.from_kpops_config(config) - proxy_wrapper = ProxyWrapper(config.kafka_rest) - topic_handler = TopicHandler(proxy_wrapper) - - return ComponentHandlers(schema_handler, connector_handler, topic_handler) - - -def setup_logging_level(verbose: bool): - logging.getLogger().setLevel(logging.DEBUG if verbose else logging.INFO) - - def parse_steps(steps: str) -> set[str]: return set(steps.split(",")) -def is_in_steps(component: PipelineComponent, component_names: set[str]) -> bool: - return component.name in component_names - - -def create_default_step_names_filter_predicate( - component_names: set[str], filter_type: FilterType -) -> ComponentFilterPredicate: - def predicate(component: PipelineComponent) -> bool: - match filter_type, is_in_steps(component, component_names): - case (FilterType.INCLUDE, False) | (FilterType.EXCLUDE, True): - return False - case _: - return True - - return predicate - - def log_action(action: str, pipeline_component: PipelineComponent): log.info("\n") log.info(LOG_DIVIDER) @@ -192,20 +138,6 @@ def log_action(action: str, pipeline_component: PipelineComponent): log.info("\n") -def create_kpops_config( - config: Path, - dotenv: list[Path] | None = None, - environment: str | None = None, - verbose: bool = False, -) -> KpopsConfig: - setup_logging_level(verbose) - YamlConfigSettingsSource.config_dir = config - YamlConfigSettingsSource.environment = environment - return KpopsConfig( - _env_file=dotenv # pyright: ignore[reportCallIssue] - ) - - @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 help="Initialize a new KPOps project." ) @@ -246,12 +178,12 @@ def schema( ) -> None: match scope: case SchemaScope.PIPELINE: - kpops_config = create_kpops_config(config) + kpops_config = KpopsConfig.create(config) gen_pipeline_schema( kpops_config.components_module, include_stock_components ) case SchemaScope.DEFAULTS: - kpops_config = create_kpops_config(config) + kpops_config = KpopsConfig.create(config) gen_defaults_schema( kpops_config.components_module, include_stock_components ) @@ -267,39 +199,21 @@ def generate( pipeline_path: Path = PIPELINE_PATH_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, - output: bool = OUTPUT_OPTION, steps: Optional[str] = PIPELINE_STEPS, filter_type: FilterType = FILTER_TYPE, environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, -) -> Pipeline: - kpops_config = create_kpops_config( - config, +): + pipeline = kpops.generate( + pipeline_path, dotenv, + config, + steps, + filter_type, environment, verbose, ) - - pipeline = setup_pipeline(pipeline_path, kpops_config, environment) - - if steps: - component_names = parse_steps(steps) - log.debug( - f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" - ) - - predicate = create_default_step_names_filter_predicate( - component_names, filter_type - ) - pipeline.filter(predicate) - - def get_step_names(steps_to_apply: list[PipelineComponent]) -> list[str]: - return [step.name for step in steps_to_apply] - - log.info(f"Filtered pipeline:\n{get_step_names(pipeline.components)}") - if output: - print_yaml(pipeline.to_yaml()) - return pipeline + print_yaml(pipeline.to_yaml()) @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 @@ -316,11 +230,10 @@ def manifest( environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, ) -> list[Resource]: - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, @@ -348,11 +261,10 @@ def deploy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, @@ -386,11 +298,10 @@ def destroy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, @@ -426,11 +337,10 @@ def reset( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, @@ -465,11 +375,10 @@ def clean( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = generate( + pipeline = kpops.generate( pipeline_path=pipeline_path, dotenv=dotenv, config=config, - output=False, steps=steps, filter_type=filter_type, environment=environment, diff --git a/kpops/cli/options.py b/kpops/cli/options.py index ac176d986..dc116bd35 100644 --- a/kpops/cli/options.py +++ b/kpops/cli/options.py @@ -1,6 +1,29 @@ +from __future__ import annotations + from enum import Enum +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from kpops.components import PipelineComponent + from kpops.pipeline import ComponentFilterPredicate class FilterType(str, Enum): INCLUDE = "include" EXCLUDE = "exclude" + + @staticmethod + def is_in_steps(component: PipelineComponent, component_names: set[str]) -> bool: + return component.name in component_names + + def create_default_step_names_filter_predicate( + self, component_names: set[str] + ) -> ComponentFilterPredicate: + def predicate(component: PipelineComponent) -> bool: + match self, FilterType.is_in_steps(component, component_names): + case (FilterType.INCLUDE, False) | (FilterType.EXCLUDE, True): + return False + case _: + return True + + return predicate diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index df2348773..97394259a 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -10,7 +10,7 @@ KafkaStreamsConfig, ) from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr -from kpops.pipeline import ValidationError +from kpops.exception import ValidationError from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/kpops/config.py b/kpops/config.py index f82068c96..b4a482f6b 100644 --- a/kpops/config.py +++ b/kpops/config.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from pathlib import Path from pydantic import AnyHttpUrl, Field, TypeAdapter @@ -127,6 +128,25 @@ class KpopsConfig(BaseSettings): model_config = SettingsConfigDict(env_prefix=ENV_PREFIX, env_nested_delimiter="__") + @classmethod + def create( + cls, + config: Path, + dotenv: list[Path] | None = None, + environment: str | None = None, + verbose: bool = False, + ) -> KpopsConfig: + cls.setup_logging_level(verbose) + YamlConfigSettingsSource.config_dir = config + YamlConfigSettingsSource.environment = environment + return KpopsConfig( + _env_file=dotenv # pyright: ignore[reportCallIssue] + ) + + @staticmethod + def setup_logging_level(verbose: bool): + logging.getLogger().setLevel(logging.DEBUG if verbose else logging.INFO) + @override @classmethod def settings_customise_sources( diff --git a/kpops/exception.py b/kpops/exception.py new file mode 100644 index 000000000..713274f3a --- /dev/null +++ b/kpops/exception.py @@ -0,0 +1,9 @@ +from __future__ import annotations + + +class ValidationError(Exception): + pass + + +class ParsingException(Exception): + pass diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 3f4f698b9..a218f335e 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -15,7 +15,16 @@ computed_field, ) +from kpops.cli.registry import Registry +from kpops.component_handlers import ComponentHandlers +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, +) +from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler +from kpops.component_handlers.topic.handler import TopicHandler +from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.exception import ParsingException, ValidationError from kpops.utils.dict_ops import update_nested_pair from kpops.utils.environment import ENV, PIPELINE_PATH from kpops.utils.yaml import load_yaml_file @@ -24,21 +33,10 @@ from collections.abc import Awaitable, Coroutine, Iterator from pathlib import Path - from kpops.cli.registry import Registry - from kpops.component_handlers import ComponentHandlers from kpops.config import KpopsConfig log = logging.getLogger("PipelineGenerator") - -class ParsingException(Exception): - pass - - -class ValidationError(Exception): - pass - - ComponentFilterPredicate: TypeAlias = Callable[[PipelineComponent], bool] @@ -50,6 +48,36 @@ class Pipeline(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) + @classmethod + def create( + cls, + pipeline_path: Path, + kpops_config: KpopsConfig, + environment: str | None, + ) -> Pipeline: + registry = Registry() + if kpops_config.components_module: + registry.find_components(kpops_config.components_module) + registry.find_components("kpops.components") + + handlers = cls.setup_handlers(kpops_config) + parser = PipelineGenerator(kpops_config, registry, handlers) + return parser.load_yaml(pipeline_path, environment) + + @staticmethod + def setup_handlers(config: KpopsConfig) -> ComponentHandlers: + schema_handler = SchemaHandler.load_schema_handler(config) + connector_handler = KafkaConnectHandler.from_kpops_config(config) + proxy_wrapper = ProxyWrapper(config.kafka_rest) + topic_handler = TopicHandler(proxy_wrapper) + + return ComponentHandlers(schema_handler, connector_handler, topic_handler) + + @computed_field(title="Step Names") + @property + def step_names(self) -> list[str]: + return [step.name for step in self.components] + @computed_field(title="Components") @property def components(self) -> list[SerializeAsAny[PipelineComponent]]: diff --git a/tests/cli/test_handlers.py b/tests/cli/test_handlers.py index 7ba0651a5..f873f6731 100644 --- a/tests/cli/test_handlers.py +++ b/tests/cli/test_handlers.py @@ -1,6 +1,5 @@ from pytest_mock import MockerFixture -from kpops.cli.main import setup_handlers from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, @@ -8,6 +7,7 @@ from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler from kpops.config import KpopsConfig, SchemaRegistryConfig +from kpops.pipeline import Pipeline from tests.cli.resources.custom_module import CustomSchemaProvider MODULE = CustomSchemaProvider.__module__ @@ -18,12 +18,12 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): kafka_brokers="broker:9092", components_module=MODULE, ) - connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") + connector_handler_mock = mocker.patch("kpops.pipeline.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") - wrapper = mocker.patch("kpops.cli.main.ProxyWrapper") + topic_handler_mock = mocker.patch("kpops.pipeline.TopicHandler") + wrapper = mocker.patch("kpops.pipeline.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler @@ -33,7 +33,7 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): topic_handler=topic_handler, ) - actual_handlers = setup_handlers(config) + actual_handlers = Pipeline.setup_handlers(config) connector_handler_mock.from_kpops_config.assert_called_once_with(config) @@ -51,16 +51,16 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): schema_registry=SchemaRegistryConfig(enabled=True), kafka_brokers="broker:9092", ) - schema_handler_mock = mocker.patch("kpops.cli.main.SchemaHandler") + schema_handler_mock = mocker.patch("kpops.pipeline.SchemaHandler") schema_handler = SchemaHandler.load_schema_handler(config) schema_handler_mock.load_schema_handler.return_value = schema_handler - connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") + connector_handler_mock = mocker.patch("kpops.pipeline.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") - wrapper = mocker.patch("kpops.cli.main.ProxyWrapper") + topic_handler_mock = mocker.patch("kpops.pipeline.TopicHandler") + wrapper = mocker.patch("kpops.pipeline.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler @@ -70,7 +70,7 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): topic_handler=topic_handler, ) - actual_handlers = setup_handlers(config) + actual_handlers = Pipeline.setup_handlers(config) schema_handler_mock.load_schema_handler.assert_called_once_with(config) diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 7af0634d7..d6a0f0d87 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -29,7 +29,7 @@ StreamsAppCleaner, ) from kpops.config import KpopsConfig, TopicNameConfig -from kpops.pipeline import ValidationError +from kpops.exception import ValidationError from tests.components import PIPELINE_BASE_DIR RESOURCES_PATH = Path(__file__).parent / "resources" diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 8b237f705..9ce6240fd 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -12,7 +12,7 @@ import kpops from kpops.cli.main import FilterType, app from kpops.components import KafkaSinkConnector, PipelineComponent -from kpops.pipeline import ParsingException, ValidationError +from kpops.exception import ParsingException, ValidationError runner = CliRunner() @@ -23,12 +23,11 @@ class TestGenerate: @pytest.fixture(autouse=True) def log_info(self, mocker: MockerFixture) -> MagicMock: - return mocker.patch("kpops.cli.main.log.info") + return mocker.patch("kpops.api.log.info") def test_python_api(self): pipeline = kpops.generate( RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - output=False, ) assert len(pipeline) == 3 assert [component.type for component in pipeline.components] == [ @@ -40,25 +39,25 @@ def test_python_api(self): def test_python_api_filter_include(self, log_info: MagicMock): pipeline = kpops.generate( RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - output=False, steps="converter", filter_type=FilterType.INCLUDE, ) assert len(pipeline) == 1 assert pipeline.components[0].type == "converter" - assert log_info.call_count == 1 + assert log_info.call_count == 2 + log_info.assert_any_call("Picked up pipeline 'first-pipeline'") log_info.assert_any_call("Filtered pipeline:\n['converter']") def test_python_api_filter_exclude(self, log_info: MagicMock): pipeline = kpops.generate( RESOURCE_PATH / "first-pipeline" / "pipeline.yaml", - output=False, steps="converter,scheduled-producer", filter_type=FilterType.EXCLUDE, ) assert len(pipeline) == 1 assert pipeline.components[0].type == "filter" - assert log_info.call_count == 1 + assert log_info.call_count == 2 + log_info.assert_any_call("Picked up pipeline 'first-pipeline'") log_info.assert_any_call( "Filtered pipeline:\n['a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name']" ) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index d0c53667b..6f79c8f37 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -3,7 +3,6 @@ import pytest from polyfactory.factories.pydantic_factory import ModelFactory -from kpops.cli.main import create_default_step_names_filter_predicate from kpops.cli.options import FilterType from kpops.component_handlers import ( ComponentHandlers, @@ -44,8 +43,8 @@ def pipeline(self) -> Pipeline: return pipeline def test_filter_include(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - {"example2", "example3"}, FilterType.INCLUDE + predicate = FilterType.INCLUDE.create_default_step_names_filter_predicate( + {"example2", "example3"} ) pipeline.filter(predicate) assert len(pipeline.components) == 2 @@ -53,23 +52,19 @@ def test_filter_include(self, pipeline: Pipeline): assert test_component_3 in pipeline.components def test_filter_include_empty(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - set(), FilterType.INCLUDE - ) + predicate = FilterType.INCLUDE.create_default_step_names_filter_predicate(set()) pipeline.filter(predicate) assert len(pipeline.components) == 0 def test_filter_exclude(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - {"example2", "example3"}, FilterType.EXCLUDE + predicate = FilterType.EXCLUDE.create_default_step_names_filter_predicate( + {"example2", "example3"} ) pipeline.filter(predicate) assert len(pipeline.components) == 1 assert test_component_1 in pipeline.components def test_filter_exclude_empty(self, pipeline: Pipeline): - predicate = create_default_step_names_filter_predicate( - set(), FilterType.EXCLUDE - ) + predicate = FilterType.EXCLUDE.create_default_step_names_filter_predicate(set()) pipeline.filter(predicate) assert len(pipeline.components) == 3 From d3d66fcee0c35aacde527e64c7e81c20f42574d9 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 21 May 2024 16:00:02 +0200 Subject: [PATCH 12/21] Separate KPOps API from the CLI --- docs/docs/user/references/cli-commands.md | 1 - kpops/__init__.py | 4 +-- kpops/api.py | 30 +++++++++++++++++++ kpops/cli/main.py | 16 +++------- .../base_components/models/resource.py | 4 +-- tests/pipeline/test_manifest.py | 1 - 6 files changed, 38 insertions(+), 18 deletions(-) diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 7225c89ad..62d940123 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -163,7 +163,6 @@ $ kpops manifest [OPTIONS] PIPELINE_PATH * `--dotenv FILE`: Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. [env var: KPOPS_DOTENV_PATH] * `--config DIRECTORY`: Path to the dir containing config.yaml files [env var: KPOPS_CONFIG_PATH; default: .] -* `--output / --no-output`: Enable output printing [default: output] * `--steps TEXT`: Comma separated list of steps to apply the command on [env var: KPOPS_PIPELINE_STEPS] * `--filter-type [include|exclude]`: Whether the --steps option should include/exclude the steps [default: FilterType.INCLUDE] * `--environment TEXT`: The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). [env var: KPOPS_ENVIRONMENT] diff --git a/kpops/__init__.py b/kpops/__init__.py index 474b65c61..af246bb97 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,8 +1,8 @@ __version__ = "5.0.1" # export public API functions -from kpops.api import generate -from kpops.cli.main import clean, deploy, destroy, init, manifest, reset +from kpops.api import generate, manifest +from kpops.cli.main import clean, deploy, destroy, init, reset __all__ = ( "generate", diff --git a/kpops/api.py b/kpops/api.py index 7c8190699..37af128b9 100644 --- a/kpops/api.py +++ b/kpops/api.py @@ -2,13 +2,18 @@ import logging from pathlib import Path +from typing import TYPE_CHECKING +import kpops from kpops.cli.options import FilterType from kpops.config import KpopsConfig from kpops.pipeline import ( Pipeline, ) +if TYPE_CHECKING: + from kpops.components.base_components.models.resource import Resource + log = logging.getLogger("KPOpsAPI") @@ -45,3 +50,28 @@ def generate( pipeline.filter(predicate) log.info(f"Filtered pipeline:\n{pipeline.step_names}") return pipeline + + +def manifest( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + verbose: bool = False, +) -> list[Resource]: + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + resources: list[Resource] = [] + for component in pipeline.components: + resource = component.manifest() + resources.append(resource) + return resources diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 911071bcd..425a0c00b 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -12,7 +12,6 @@ from kpops import __version__ from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType -from kpops.components.base_components.models.resource import Resource from kpops.config import ENV_PREFIX, KpopsConfig from kpops.utils.cli_commands import init_project from kpops.utils.gen_schema import ( @@ -224,13 +223,12 @@ def manifest( pipeline_path: Path = PIPELINE_PATH_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, - output: bool = OUTPUT_OPTION, steps: Optional[str] = PIPELINE_STEPS, filter_type: FilterType = FILTER_TYPE, environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, -) -> list[Resource]: - pipeline = kpops.generate( +): + resources = kpops.manifest( pipeline_path=pipeline_path, dotenv=dotenv, config=config, @@ -239,14 +237,8 @@ def manifest( environment=environment, verbose=verbose, ) - resources: list[Resource] = [] - for component in pipeline.components: - resource = component.manifest() - resources.append(resource) - if output: - for manifest in resource: - print_yaml(manifest) - return resources + for rendered in resources: + print_yaml(rendered) @app.command(help="Deploy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 diff --git a/kpops/components/base_components/models/resource.py b/kpops/components/base_components/models/resource.py index 08c01f344..fc0eff7a1 100644 --- a/kpops/components/base_components/models/resource.py +++ b/kpops/components/base_components/models/resource.py @@ -1,5 +1,5 @@ -from collections.abc import Mapping, Sequence +from collections.abc import Mapping from typing import Any, TypeAlias # representation of final resource for component, e.g. a list of Kubernetes manifests -Resource: TypeAlias = Sequence[Mapping[str, Any]] +Resource: TypeAlias = Mapping[str, Any] diff --git a/tests/pipeline/test_manifest.py b/tests/pipeline/test_manifest.py index 8dc58ae93..d12dc9172 100644 --- a/tests/pipeline/test_manifest.py +++ b/tests/pipeline/test_manifest.py @@ -101,7 +101,6 @@ def test_custom_config(self, mock_execute: MagicMock): def test_python_api(self, snapshot: Snapshot): resources = kpops.manifest( RESOURCE_PATH / "custom-config/pipeline.yaml", - output=False, environment="development", ) assert isinstance(resources, list) From d1d102e8f9dfcbc6d26d208757459f1944c58f14 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 21 May 2024 16:18:29 +0200 Subject: [PATCH 13/21] Move all cli to api --- kpops/__init__.py | 3 +- kpops/api.py | 181 +++++++++++++++++++++++++++++++++++++++++++++- kpops/cli/main.py | 116 ++++------------------------- 3 files changed, 195 insertions(+), 105 deletions(-) diff --git a/kpops/__init__.py b/kpops/__init__.py index af246bb97..ab41f32b2 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,8 +1,7 @@ __version__ = "5.0.1" # export public API functions -from kpops.api import generate, manifest -from kpops.cli.main import clean, deploy, destroy, init, reset +from kpops.api import clean, deploy, destroy, generate, init, manifest, reset __all__ = ( "generate", diff --git a/kpops/api.py b/kpops/api.py index 37af128b9..c85ced1a0 100644 --- a/kpops/api.py +++ b/kpops/api.py @@ -1,20 +1,39 @@ from __future__ import annotations +import asyncio import logging from pathlib import Path from typing import TYPE_CHECKING import kpops +from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType from kpops.config import KpopsConfig from kpops.pipeline import ( Pipeline, ) +from kpops.utils.cli_commands import init_project if TYPE_CHECKING: + from kpops.components import PipelineComponent from kpops.components.base_components.models.resource import Resource -log = logging.getLogger("KPOpsAPI") +logger = logging.getLogger() +logging.getLogger("httpx").setLevel(logging.WARNING) +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(CustomFormatter()) +logger.addHandler(stream_handler) + +log = logging.getLogger("") +LOG_DIVIDER = "#" * 100 + + +def log_action(action: str, pipeline_component: PipelineComponent): + log.info("\n") + log.info(LOG_DIVIDER) + log.info(f"{action} {pipeline_component.name}") + log.info(LOG_DIVIDER) + log.info("\n") def parse_steps(steps: str) -> set[str]: @@ -75,3 +94,163 @@ def manifest( resource = component.manifest() resources.append(resource) return resources + + +def deploy( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def deploy_runner(component: PipelineComponent): + log_action("Deploy", component) + await component.deploy(dry_run) + + async def async_deploy(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(deploy_runner) + await pipeline_tasks + else: + for component in pipeline.components: + await deploy_runner(component) + + asyncio.run(async_deploy()) + + +def destroy( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def destroy_runner(component: PipelineComponent): + log_action("Destroy", component) + await component.destroy(dry_run) + + async def async_destroy(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph( + destroy_runner, reverse=True + ) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await destroy_runner(component) + + asyncio.run(async_destroy()) + + +def reset( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def reset_runner(component: PipelineComponent): + await component.destroy(dry_run) + log_action("Reset", component) + await component.reset(dry_run) + + async def async_reset(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await reset_runner(component) + + asyncio.run(async_reset()) + + +def clean( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: str | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + dry_run: bool = True, + verbose: bool = True, + parallel: bool = False, +): + pipeline = kpops.generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + + async def clean_runner(component: PipelineComponent): + await component.destroy(dry_run) + log_action("Clean", component) + await component.clean(dry_run) + + async def async_clean(): + if parallel: + pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) + await pipeline_tasks + else: + for component in reversed(pipeline.components): + await clean_runner(component) + + asyncio.run(async_clean()) + + +def init( + path: Path, + config_include_opt: bool = False, +): + if not path.exists(): + path.mkdir(parents=False) + elif next(path.iterdir(), False): + log.warning("Please provide a path to an empty directory.") + return + init_project(path, config_include_opt) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 425a0c00b..a364b3a5b 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -1,19 +1,15 @@ from __future__ import annotations -import asyncio -import logging from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import Optional import dtyper import typer import kpops from kpops import __version__ -from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType from kpops.config import ENV_PREFIX, KpopsConfig -from kpops.utils.cli_commands import init_project from kpops.utils.gen_schema import ( SchemaScope, gen_config_schema, @@ -22,12 +18,6 @@ ) from kpops.utils.yaml import print_yaml -if TYPE_CHECKING: - from kpops.components import PipelineComponent - - -LOG_DIVIDER = "#" * 100 - app = dtyper.Typer(pretty_exceptions_enable=False) DOTENV_PATH_OPTION: Optional[list[Path]] = typer.Option( @@ -116,27 +106,6 @@ ) -logger = logging.getLogger() -logging.getLogger("httpx").setLevel(logging.WARNING) -stream_handler = logging.StreamHandler() -stream_handler.setFormatter(CustomFormatter()) -logger.addHandler(stream_handler) - -log = logging.getLogger("") - - -def parse_steps(steps: str) -> set[str]: - return set(steps.split(",")) - - -def log_action(action: str, pipeline_component: PipelineComponent): - log.info("\n") - log.info(LOG_DIVIDER) - log.info(f"{action} {pipeline_component.name}") - log.info(LOG_DIVIDER) - log.info("\n") - - @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 help="Initialize a new KPOps project." ) @@ -144,12 +113,7 @@ def init( path: Path = PROJECT_PATH, config_include_opt: bool = CONFIG_INCLUDE_OPTIONAL, ): - if not path.exists(): - path.mkdir(parents=False) - elif next(path.iterdir(), False): - log.warning("Please provide a path to an empty directory.") - return - init_project(path, config_include_opt) + kpops.init(path, config_include_opt=config_include_opt) @app.command( # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 @@ -253,30 +217,18 @@ def deploy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = kpops.generate( + kpops.deploy( pipeline_path=pipeline_path, dotenv=dotenv, config=config, steps=steps, filter_type=filter_type, environment=environment, + dry_run=dry_run, verbose=verbose, + parallel=parallel, ) - async def deploy_runner(component: PipelineComponent): - log_action("Deploy", component) - await component.deploy(dry_run) - - async def async_deploy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(deploy_runner) - await pipeline_tasks - else: - for component in pipeline.components: - await deploy_runner(component) - - asyncio.run(async_deploy()) - @app.command(help="Destroy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 def destroy( @@ -290,32 +242,18 @@ def destroy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = kpops.generate( + kpops.destroy( pipeline_path=pipeline_path, dotenv=dotenv, config=config, steps=steps, filter_type=filter_type, environment=environment, + dry_run=dry_run, verbose=verbose, + parallel=parallel, ) - async def destroy_runner(component: PipelineComponent): - log_action("Destroy", component) - await component.destroy(dry_run) - - async def async_destroy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph( - destroy_runner, reverse=True - ) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await destroy_runner(component) - - asyncio.run(async_destroy()) - @app.command(help="Reset pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 def reset( @@ -329,31 +267,18 @@ def reset( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = kpops.generate( + kpops.reset( pipeline_path=pipeline_path, dotenv=dotenv, config=config, steps=steps, filter_type=filter_type, environment=environment, + dry_run=dry_run, verbose=verbose, + parallel=parallel, ) - async def reset_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Reset", component) - await component.reset(dry_run) - - async def async_reset(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await reset_runner(component) - - asyncio.run(async_reset()) - @app.command(help="Clean pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 def clean( @@ -367,31 +292,18 @@ def clean( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - pipeline = kpops.generate( + kpops.clean( pipeline_path=pipeline_path, dotenv=dotenv, config=config, steps=steps, filter_type=filter_type, environment=environment, + dry_run=dry_run, verbose=verbose, + parallel=parallel, ) - async def clean_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Clean", component) - await component.clean(dry_run) - - async def async_clean(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await clean_runner(component) - - asyncio.run(async_clean()) - def version_callback(show_version: bool) -> None: if show_version: From 3172bdb85b89dbb5f09f86da502410c79653d298 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 21 May 2024 16:26:53 +0200 Subject: [PATCH 14/21] Fix pyright errors --- kpops/cli/main.py | 5 +++-- kpops/components/base_components/models/resource.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/kpops/cli/main.py b/kpops/cli/main.py index a364b3a5b..d15934e15 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -201,8 +201,9 @@ def manifest( environment=environment, verbose=verbose, ) - for rendered in resources: - print_yaml(rendered) + for resource in resources: + for rendered_manifest in resource: + print_yaml(rendered_manifest) @app.command(help="Deploy pipeline steps") # pyright: ignore[reportCallIssue] https://github.com/rec/dtyper/issues/8 diff --git a/kpops/components/base_components/models/resource.py b/kpops/components/base_components/models/resource.py index fc0eff7a1..08c01f344 100644 --- a/kpops/components/base_components/models/resource.py +++ b/kpops/components/base_components/models/resource.py @@ -1,5 +1,5 @@ -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from typing import Any, TypeAlias # representation of final resource for component, e.g. a list of Kubernetes manifests -Resource: TypeAlias = Mapping[str, Any] +Resource: TypeAlias = Sequence[Mapping[str, Any]] From d61cb04d9da71e8e3c4cbb72c02ee0718afd1a38 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 09:56:59 +0200 Subject: [PATCH 15/21] remove computed field --- kpops/pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 7bfc80ee7..fee22ac82 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -73,7 +73,6 @@ def setup_handlers(config: KpopsConfig) -> ComponentHandlers: return ComponentHandlers(schema_handler, connector_handler, topic_handler) - @computed_field(title="Step Names") @property def step_names(self) -> list[str]: return [step.name for step in self.components] From aae9749318029bf12a6ed284d54f6fd7833e475f Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 10:16:57 +0200 Subject: [PATCH 16/21] address reviews --- kpops/api.py | 39 ++++++++++++++++++++++++++++++++++++-- kpops/pipeline.py | 31 ------------------------------ tests/cli/test_handlers.py | 22 +++++++++++---------- 3 files changed, 49 insertions(+), 43 deletions(-) diff --git a/kpops/api.py b/kpops/api.py index c85ced1a0..69be4fdb2 100644 --- a/kpops/api.py +++ b/kpops/api.py @@ -8,15 +8,26 @@ import kpops from kpops.cli.custom_formatter import CustomFormatter from kpops.cli.options import FilterType +from kpops.cli.registry import Registry +from kpops.component_handlers import ComponentHandlers +from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, +) +from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler +from kpops.component_handlers.topic.handler import TopicHandler +from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.config import KpopsConfig from kpops.pipeline import ( - Pipeline, + PipelineGenerator, ) from kpops.utils.cli_commands import init_project if TYPE_CHECKING: from kpops.components import PipelineComponent from kpops.components.base_components.models.resource import Resource + from kpops.pipeline import ( + Pipeline, + ) logger = logging.getLogger() logging.getLogger("httpx").setLevel(logging.WARNING) @@ -55,7 +66,7 @@ def generate( environment, verbose, ) - pipeline = Pipeline.create(pipeline_path, kpops_config, environment) + pipeline = create_pipeline(pipeline_path, kpops_config, environment) log.info(f"Picked up pipeline '{pipeline_path.parent.name}'") if steps: component_names = parse_steps(steps) @@ -254,3 +265,27 @@ def init( log.warning("Please provide a path to an empty directory.") return init_project(path, config_include_opt) + + +def create_pipeline( + pipeline_path: Path, + kpops_config: KpopsConfig, + environment: str | None, +) -> Pipeline: + registry = Registry() + if kpops_config.components_module: + registry.find_components(kpops_config.components_module) + registry.find_components("kpops.components") + + handlers = setup_handlers(kpops_config) + parser = PipelineGenerator(kpops_config, registry, handlers) + return parser.load_yaml(pipeline_path, environment) + + +def setup_handlers(config: KpopsConfig) -> ComponentHandlers: + schema_handler = SchemaHandler.load_schema_handler(config) + connector_handler = KafkaConnectHandler.from_kpops_config(config) + proxy_wrapper = ProxyWrapper(config.kafka_rest) + topic_handler = TopicHandler(proxy_wrapper) + + return ComponentHandlers(schema_handler, connector_handler, topic_handler) diff --git a/kpops/pipeline.py b/kpops/pipeline.py index fee22ac82..1fbe3bfe6 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -17,12 +17,6 @@ from kpops.cli.registry import Registry from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( - KafkaConnectHandler, -) -from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler -from kpops.component_handlers.topic.handler import TopicHandler -from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.exception import ParsingException, ValidationError from kpops.utils.dict_ops import update_nested_pair @@ -48,31 +42,6 @@ class Pipeline(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) - @classmethod - def create( - cls, - pipeline_path: Path, - kpops_config: KpopsConfig, - environment: str | None, - ) -> Pipeline: - registry = Registry() - if kpops_config.components_module: - registry.find_components(kpops_config.components_module) - registry.find_components("kpops.components") - - handlers = cls.setup_handlers(kpops_config) - parser = PipelineGenerator(kpops_config, registry, handlers) - return parser.load_yaml(pipeline_path, environment) - - @staticmethod - def setup_handlers(config: KpopsConfig) -> ComponentHandlers: - schema_handler = SchemaHandler.load_schema_handler(config) - connector_handler = KafkaConnectHandler.from_kpops_config(config) - proxy_wrapper = ProxyWrapper(config.kafka_rest) - topic_handler = TopicHandler(proxy_wrapper) - - return ComponentHandlers(schema_handler, connector_handler, topic_handler) - @property def step_names(self) -> list[str]: return [step.name for step in self.components] diff --git a/tests/cli/test_handlers.py b/tests/cli/test_handlers.py index f873f6731..18d35131b 100644 --- a/tests/cli/test_handlers.py +++ b/tests/cli/test_handlers.py @@ -1,5 +1,6 @@ from pytest_mock import MockerFixture +from kpops.api import setup_handlers from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, @@ -7,9 +8,10 @@ from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler from kpops.config import KpopsConfig, SchemaRegistryConfig -from kpops.pipeline import Pipeline from tests.cli.resources.custom_module import CustomSchemaProvider +HANDLER_MODULE = "kpops.api" + MODULE = CustomSchemaProvider.__module__ @@ -18,12 +20,12 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): kafka_brokers="broker:9092", components_module=MODULE, ) - connector_handler_mock = mocker.patch("kpops.pipeline.KafkaConnectHandler") + connector_handler_mock = mocker.patch(f"{HANDLER_MODULE}.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.pipeline.TopicHandler") - wrapper = mocker.patch("kpops.pipeline.ProxyWrapper") + topic_handler_mock = mocker.patch(f"{HANDLER_MODULE}.TopicHandler") + wrapper = mocker.patch(f"{HANDLER_MODULE}.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler @@ -33,7 +35,7 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): topic_handler=topic_handler, ) - actual_handlers = Pipeline.setup_handlers(config) + actual_handlers = setup_handlers(config) connector_handler_mock.from_kpops_config.assert_called_once_with(config) @@ -51,16 +53,16 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): schema_registry=SchemaRegistryConfig(enabled=True), kafka_brokers="broker:9092", ) - schema_handler_mock = mocker.patch("kpops.pipeline.SchemaHandler") + schema_handler_mock = mocker.patch(f"{HANDLER_MODULE}.SchemaHandler") schema_handler = SchemaHandler.load_schema_handler(config) schema_handler_mock.load_schema_handler.return_value = schema_handler - connector_handler_mock = mocker.patch("kpops.pipeline.KafkaConnectHandler") + connector_handler_mock = mocker.patch(f"{HANDLER_MODULE}.KafkaConnectHandler") connector_handler = KafkaConnectHandler.from_kpops_config(config) connector_handler_mock.from_kpops_config.return_value = connector_handler - topic_handler_mock = mocker.patch("kpops.pipeline.TopicHandler") - wrapper = mocker.patch("kpops.pipeline.ProxyWrapper") + topic_handler_mock = mocker.patch(f"{HANDLER_MODULE}.TopicHandler") + wrapper = mocker.patch(f"{HANDLER_MODULE}.ProxyWrapper") topic_handler = TopicHandler(wrapper) topic_handler_mock.return_value = topic_handler @@ -70,7 +72,7 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): topic_handler=topic_handler, ) - actual_handlers = Pipeline.setup_handlers(config) + actual_handlers = setup_handlers(config) schema_handler_mock.load_schema_handler.assert_called_once_with(config) From 01409e57b11ddcf3cf954bc1c06a8e96c81653fa Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 10:34:26 +0200 Subject: [PATCH 17/21] move to api folder --- kpops/__init__.py | 2 +- kpops/api/__init__.py | 0 kpops/{ => api}/api.py | 0 kpops/{ => api}/exception.py | 4 ++++ kpops/cli/exception.py | 2 -- kpops/cli/registry.py | 2 +- kpops/component_handlers/schema_handler/schema_handler.py | 2 +- kpops/components/streams_bootstrap/streams/model.py | 2 +- kpops/pipeline.py | 2 +- tests/cli/test_handlers.py | 4 ++-- tests/components/test_streams_app.py | 2 +- tests/pipeline/test_generate.py | 4 ++-- 12 files changed, 14 insertions(+), 12 deletions(-) create mode 100644 kpops/api/__init__.py rename kpops/{ => api}/api.py (100%) rename kpops/{ => api}/exception.py (50%) delete mode 100644 kpops/cli/exception.py diff --git a/kpops/__init__.py b/kpops/__init__.py index ab41f32b2..7774836d5 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,7 +1,7 @@ __version__ = "5.0.1" # export public API functions -from kpops.api import clean, deploy, destroy, generate, init, manifest, reset +from kpops.api.api import clean, deploy, destroy, generate, init, manifest, reset __all__ = ( "generate", diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kpops/api.py b/kpops/api/api.py similarity index 100% rename from kpops/api.py rename to kpops/api/api.py diff --git a/kpops/exception.py b/kpops/api/exception.py similarity index 50% rename from kpops/exception.py rename to kpops/api/exception.py index 713274f3a..65094fd29 100644 --- a/kpops/exception.py +++ b/kpops/api/exception.py @@ -7,3 +7,7 @@ class ValidationError(Exception): class ParsingException(Exception): pass + + +class ClassNotFoundError(Exception): + """Similar to builtin `ModuleNotFoundError`; class doesn't exist inside module.""" diff --git a/kpops/cli/exception.py b/kpops/cli/exception.py deleted file mode 100644 index e9b0a65de..000000000 --- a/kpops/cli/exception.py +++ /dev/null @@ -1,2 +0,0 @@ -class ClassNotFoundError(Exception): - """Similar to builtin `ModuleNotFoundError`; class doesn't exist inside module.""" diff --git a/kpops/cli/registry.py b/kpops/cli/registry.py index 2b3b51631..2df483329 100644 --- a/kpops/cli/registry.py +++ b/kpops/cli/registry.py @@ -9,7 +9,7 @@ from typing import TYPE_CHECKING, TypeVar from kpops import __name__ -from kpops.cli.exception import ClassNotFoundError +from kpops.api.exception import ClassNotFoundError from kpops.components.base_components.pipeline_component import PipelineComponent if TYPE_CHECKING: diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index 936ba0223..c436cd4b0 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -8,7 +8,7 @@ from schema_registry.client import AsyncSchemaRegistryClient from schema_registry.client.schema import AvroSchema -from kpops.cli.exception import ClassNotFoundError +from kpops.api.exception import ClassNotFoundError from kpops.cli.registry import find_class from kpops.component_handlers.schema_handler.schema_provider import ( Schema, diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index 97394259a..04f95b54b 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -5,12 +5,12 @@ import pydantic from pydantic import BaseModel, ConfigDict, Field, model_validator +from kpops.api.exception import ValidationError from kpops.components.base_components.kafka_app import ( KafkaAppValues, KafkaStreamsConfig, ) from kpops.components.base_components.models.topic import KafkaTopic, KafkaTopicStr -from kpops.exception import ValidationError from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import ( CamelCaseConfigModel, diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 1fbe3bfe6..5d3930820 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -15,10 +15,10 @@ computed_field, ) +from kpops.api.exception import ParsingException, ValidationError from kpops.cli.registry import Registry from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.pipeline_component import PipelineComponent -from kpops.exception import ParsingException, ValidationError from kpops.utils.dict_ops import update_nested_pair from kpops.utils.environment import ENV, PIPELINE_PATH from kpops.utils.yaml import load_yaml_file diff --git a/tests/cli/test_handlers.py b/tests/cli/test_handlers.py index 18d35131b..be1b63474 100644 --- a/tests/cli/test_handlers.py +++ b/tests/cli/test_handlers.py @@ -1,6 +1,6 @@ from pytest_mock import MockerFixture -from kpops.api import setup_handlers +from kpops.api.api import setup_handlers from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, @@ -10,7 +10,7 @@ from kpops.config import KpopsConfig, SchemaRegistryConfig from tests.cli.resources.custom_module import CustomSchemaProvider -HANDLER_MODULE = "kpops.api" +HANDLER_MODULE = "kpops.api.api" MODULE = CustomSchemaProvider.__module__ diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index d6a0f0d87..cb340174a 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -5,6 +5,7 @@ import pytest from pytest_mock import MockerFixture +from kpops.api.exception import ValidationError from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import ( HelmDiffConfig, @@ -29,7 +30,6 @@ StreamsAppCleaner, ) from kpops.config import KpopsConfig, TopicNameConfig -from kpops.exception import ValidationError from tests.components import PIPELINE_BASE_DIR RESOURCES_PATH = Path(__file__).parent / "resources" diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 0fa5a9a03..95ec7d2fb 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -10,9 +10,9 @@ from typer.testing import CliRunner import kpops +from kpops.api.exception import ParsingException, ValidationError from kpops.cli.main import FilterType, app from kpops.components import KafkaSinkConnector, PipelineComponent -from kpops.exception import ParsingException, ValidationError runner = CliRunner() @@ -23,7 +23,7 @@ class TestGenerate: @pytest.fixture(autouse=True) def log_info(self, mocker: MockerFixture) -> MagicMock: - return mocker.patch("kpops.api.log.info") + return mocker.patch("kpops.api.api.log.info") def test_python_api(self): pipeline = kpops.generate( From d739b2f2ef2e6bfe13925eb90434068fb77cb7ea Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 22 May 2024 10:37:10 +0200 Subject: [PATCH 18/21] move to api folder --- hooks/gen_docs/gen_docs_components.py | 2 +- kpops/api/api.py | 6 +++--- kpops/{cli => api}/custom_formatter.py | 0 kpops/{cli => api}/options.py | 0 kpops/{cli => api}/registry.py | 0 kpops/cli/main.py | 2 +- kpops/component_handlers/schema_handler/schema_handler.py | 2 +- kpops/pipeline.py | 2 +- kpops/utils/gen_schema.py | 2 +- tests/cli/test_registry.py | 2 +- tests/cli/test_schema_generation.py | 2 +- tests/pipeline/test_pipeline.py | 2 +- 12 files changed, 11 insertions(+), 11 deletions(-) rename kpops/{cli => api}/custom_formatter.py (100%) rename kpops/{cli => api}/options.py (100%) rename kpops/{cli => api}/registry.py (100%) diff --git a/hooks/gen_docs/gen_docs_components.py b/hooks/gen_docs/gen_docs_components.py index 2e0a4f1a0..10bb40af7 100644 --- a/hooks/gen_docs/gen_docs_components.py +++ b/hooks/gen_docs/gen_docs_components.py @@ -8,7 +8,7 @@ import yaml from hooks import ROOT -from kpops.cli.registry import _find_classes +from kpops.api.registry import _find_classes from kpops.components import KafkaConnector, PipelineComponent from kpops.utils.colorify import redify, yellowify from kpops.utils.pydantic import issubclass_patched diff --git a/kpops/api/api.py b/kpops/api/api.py index 69be4fdb2..ec0ddc676 100644 --- a/kpops/api/api.py +++ b/kpops/api/api.py @@ -6,9 +6,9 @@ from typing import TYPE_CHECKING import kpops -from kpops.cli.custom_formatter import CustomFormatter -from kpops.cli.options import FilterType -from kpops.cli.registry import Registry +from kpops.api.custom_formatter import CustomFormatter +from kpops.api.options import FilterType +from kpops.api.registry import Registry from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, diff --git a/kpops/cli/custom_formatter.py b/kpops/api/custom_formatter.py similarity index 100% rename from kpops/cli/custom_formatter.py rename to kpops/api/custom_formatter.py diff --git a/kpops/cli/options.py b/kpops/api/options.py similarity index 100% rename from kpops/cli/options.py rename to kpops/api/options.py diff --git a/kpops/cli/registry.py b/kpops/api/registry.py similarity index 100% rename from kpops/cli/registry.py rename to kpops/api/registry.py diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 7d7d05f68..3f664f109 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -8,7 +8,7 @@ import kpops from kpops import __version__ -from kpops.cli.options import FilterType +from kpops.api.options import FilterType from kpops.cli.utils import collect_pipeline_paths from kpops.config import ENV_PREFIX, KpopsConfig from kpops.utils.gen_schema import ( diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index c436cd4b0..94b5cd3bf 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -9,7 +9,7 @@ from schema_registry.client.schema import AvroSchema from kpops.api.exception import ClassNotFoundError -from kpops.cli.registry import find_class +from kpops.api.registry import find_class from kpops.component_handlers.schema_handler.schema_provider import ( Schema, SchemaProvider, diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 5d3930820..4f92792ac 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -16,7 +16,7 @@ ) from kpops.api.exception import ParsingException, ValidationError -from kpops.cli.registry import Registry +from kpops.api.registry import Registry from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.utils.dict_ops import update_nested_pair diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index b07749ec6..223e8adda 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -21,7 +21,7 @@ ModelFieldsSchema, ) -from kpops.cli.registry import _find_classes +from kpops.api.registry import _find_classes from kpops.components import ( PipelineComponent, ) diff --git a/tests/cli/test_registry.py b/tests/cli/test_registry.py index f305c24a3..ba91eb61d 100644 --- a/tests/cli/test_registry.py +++ b/tests/cli/test_registry.py @@ -2,7 +2,7 @@ import pytest -from kpops.cli.registry import ClassNotFoundError, Registry, _find_classes, find_class +from kpops.api.registry import ClassNotFoundError, Registry, _find_classes, find_class from kpops.component_handlers.schema_handler.schema_provider import SchemaProvider from kpops.components.base_components.pipeline_component import PipelineComponent from tests.cli.resources.custom_module import CustomSchemaProvider diff --git a/tests/cli/test_schema_generation.py b/tests/cli/test_schema_generation.py index 4e1b4aa09..0741340e0 100644 --- a/tests/cli/test_schema_generation.py +++ b/tests/cli/test_schema_generation.py @@ -9,8 +9,8 @@ from pydantic import ConfigDict, Field from typer.testing import CliRunner +from kpops.api.registry import Registry from kpops.cli.main import app -from kpops.cli.registry import Registry from kpops.components import PipelineComponent from kpops.utils.docstring import describe_attr diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 6f79c8f37..b5c741ad0 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -3,7 +3,7 @@ import pytest from polyfactory.factories.pydantic_factory import ModelFactory -from kpops.cli.options import FilterType +from kpops.api.options import FilterType from kpops.component_handlers import ( ComponentHandlers, ) From 1b34acadbd9666738942641e592347611d5deaa5 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Thu, 23 May 2024 11:26:54 +0200 Subject: [PATCH 19/21] Update files --- kpops/api/api.py | 291 ---------------------------------- kpops/api/custom_formatter.py | 0 2 files changed, 291 deletions(-) delete mode 100644 kpops/api/api.py delete mode 100644 kpops/api/custom_formatter.py diff --git a/kpops/api/api.py b/kpops/api/api.py deleted file mode 100644 index ec0ddc676..000000000 --- a/kpops/api/api.py +++ /dev/null @@ -1,291 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -from pathlib import Path -from typing import TYPE_CHECKING - -import kpops -from kpops.api.custom_formatter import CustomFormatter -from kpops.api.options import FilterType -from kpops.api.registry import Registry -from kpops.component_handlers import ComponentHandlers -from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( - KafkaConnectHandler, -) -from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler -from kpops.component_handlers.topic.handler import TopicHandler -from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper -from kpops.config import KpopsConfig -from kpops.pipeline import ( - PipelineGenerator, -) -from kpops.utils.cli_commands import init_project - -if TYPE_CHECKING: - from kpops.components import PipelineComponent - from kpops.components.base_components.models.resource import Resource - from kpops.pipeline import ( - Pipeline, - ) - -logger = logging.getLogger() -logging.getLogger("httpx").setLevel(logging.WARNING) -stream_handler = logging.StreamHandler() -stream_handler.setFormatter(CustomFormatter()) -logger.addHandler(stream_handler) - -log = logging.getLogger("") -LOG_DIVIDER = "#" * 100 - - -def log_action(action: str, pipeline_component: PipelineComponent): - log.info("\n") - log.info(LOG_DIVIDER) - log.info(f"{action} {pipeline_component.name}") - log.info(LOG_DIVIDER) - log.info("\n") - - -def parse_steps(steps: str) -> set[str]: - return set(steps.split(",")) - - -def generate( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - verbose: bool = False, -) -> Pipeline: - kpops_config = KpopsConfig.create( - config, - dotenv, - environment, - verbose, - ) - pipeline = create_pipeline(pipeline_path, kpops_config, environment) - log.info(f"Picked up pipeline '{pipeline_path.parent.name}'") - if steps: - component_names = parse_steps(steps) - log.debug( - f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}" - ) - - predicate = filter_type.create_default_step_names_filter_predicate( - component_names - ) - pipeline.filter(predicate) - log.info(f"Filtered pipeline:\n{pipeline.step_names}") - return pipeline - - -def manifest( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - verbose: bool = False, -) -> list[Resource]: - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - resources: list[Resource] = [] - for component in pipeline.components: - resource = component.manifest() - resources.append(resource) - return resources - - -def deploy( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - dry_run: bool = True, - verbose: bool = True, - parallel: bool = False, -): - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def deploy_runner(component: PipelineComponent): - log_action("Deploy", component) - await component.deploy(dry_run) - - async def async_deploy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(deploy_runner) - await pipeline_tasks - else: - for component in pipeline.components: - await deploy_runner(component) - - asyncio.run(async_deploy()) - - -def destroy( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - dry_run: bool = True, - verbose: bool = True, - parallel: bool = False, -): - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def destroy_runner(component: PipelineComponent): - log_action("Destroy", component) - await component.destroy(dry_run) - - async def async_destroy(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph( - destroy_runner, reverse=True - ) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await destroy_runner(component) - - asyncio.run(async_destroy()) - - -def reset( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - dry_run: bool = True, - verbose: bool = True, - parallel: bool = False, -): - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def reset_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Reset", component) - await component.reset(dry_run) - - async def async_reset(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await reset_runner(component) - - asyncio.run(async_reset()) - - -def clean( - pipeline_path: Path, - dotenv: list[Path] | None = None, - config: Path = Path(), - steps: str | None = None, - filter_type: FilterType = FilterType.INCLUDE, - environment: str | None = None, - dry_run: bool = True, - verbose: bool = True, - parallel: bool = False, -): - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=steps, - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - - async def clean_runner(component: PipelineComponent): - await component.destroy(dry_run) - log_action("Clean", component) - await component.clean(dry_run) - - async def async_clean(): - if parallel: - pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True) - await pipeline_tasks - else: - for component in reversed(pipeline.components): - await clean_runner(component) - - asyncio.run(async_clean()) - - -def init( - path: Path, - config_include_opt: bool = False, -): - if not path.exists(): - path.mkdir(parents=False) - elif next(path.iterdir(), False): - log.warning("Please provide a path to an empty directory.") - return - init_project(path, config_include_opt) - - -def create_pipeline( - pipeline_path: Path, - kpops_config: KpopsConfig, - environment: str | None, -) -> Pipeline: - registry = Registry() - if kpops_config.components_module: - registry.find_components(kpops_config.components_module) - registry.find_components("kpops.components") - - handlers = setup_handlers(kpops_config) - parser = PipelineGenerator(kpops_config, registry, handlers) - return parser.load_yaml(pipeline_path, environment) - - -def setup_handlers(config: KpopsConfig) -> ComponentHandlers: - schema_handler = SchemaHandler.load_schema_handler(config) - connector_handler = KafkaConnectHandler.from_kpops_config(config) - proxy_wrapper = ProxyWrapper(config.kafka_rest) - topic_handler = TopicHandler(proxy_wrapper) - - return ComponentHandlers(schema_handler, connector_handler, topic_handler) diff --git a/kpops/api/custom_formatter.py b/kpops/api/custom_formatter.py deleted file mode 100644 index e69de29bb..000000000 From 7258f953f3fd9840a4f529dc79fe1ea315ec91c3 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Thu, 23 May 2024 11:27:39 +0200 Subject: [PATCH 20/21] Update files --- kpops/pipeline.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 4f92792ac..f0b1aa6b8 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -55,10 +55,6 @@ def components(self) -> list[SerializeAsAny[PipelineComponent]]: def last(self) -> PipelineComponent: return self.components[-1] - def extend(self, pipeline: Pipeline) -> None: - for component in pipeline.components: - self.add(component) - def add(self, component: PipelineComponent) -> None: if self._component_index.get(component.id) is not None: msg = ( From eeac4805925f22e51e3e69ffb136b865a45ddd42 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 24 May 2024 09:11:14 +0200 Subject: [PATCH 21/21] add tests --- .../docs/resources/variables/cli_env_vars.env | 5 +- docs/docs/resources/variables/cli_env_vars.md | 2 +- docs/docs/user/references/cli-commands.md | 24 +- kpops/cli/main.py | 30 +- kpops/cli/utils.py | 23 +- .../pipeline.yaml | 196 +++++++------- .../pipeline.yaml | 256 ++++++++++++++++++ tests/pipeline/test_generate.py | 14 + 8 files changed, 411 insertions(+), 139 deletions(-) create mode 100644 tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml diff --git a/docs/docs/resources/variables/cli_env_vars.env b/docs/docs/resources/variables/cli_env_vars.env index c51e2ee85..21436ded7 100644 --- a/docs/docs/resources/variables/cli_env_vars.env +++ b/docs/docs/resources/variables/cli_env_vars.env @@ -14,7 +14,8 @@ KPOPS_DOTENV_PATH # No default value, not required # Suffix your environment files with this value (e.g. # defaults_development.yaml for environment=development). KPOPS_ENVIRONMENT # No default value, not required -# Path to YAML with pipeline definition -KPOPS_PIPELINE_PATH # No default value, required +# Paths to dir containing 'pipeline.yaml' or files named +# 'pipeline.yaml'. +KPOPS_PIPELINE_PATHS # No default value, required # Comma separated list of steps to apply the command on KPOPS_PIPELINE_STEPS # No default value, not required diff --git a/docs/docs/resources/variables/cli_env_vars.md b/docs/docs/resources/variables/cli_env_vars.md index 87f8d2b2c..da6a2d994 100644 --- a/docs/docs/resources/variables/cli_env_vars.md +++ b/docs/docs/resources/variables/cli_env_vars.md @@ -5,5 +5,5 @@ These variables take precedence over the commands' flags. If a variable is set, |KPOPS_CONFIG_PATH |. |False |Path to the dir containing config.yaml files | |KPOPS_DOTENV_PATH | |False |Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. | |KPOPS_ENVIRONMENT | |False |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).| -|KPOPS_PIPELINE_PATH | |True |Path to YAML with pipeline definition | +|KPOPS_PIPELINE_PATHS| |True |Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. | |KPOPS_PIPELINE_STEPS| |False |Comma separated list of steps to apply the command on | diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 62d940123..570563069 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -31,12 +31,12 @@ Clean pipeline steps **Usage**: ```console -$ kpops clean [OPTIONS] PIPELINE_PATH +$ kpops clean [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -57,12 +57,12 @@ Deploy pipeline steps **Usage**: ```console -$ kpops deploy [OPTIONS] PIPELINE_PATH +$ kpops deploy [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -83,12 +83,12 @@ Destroy pipeline steps **Usage**: ```console -$ kpops destroy [OPTIONS] PIPELINE_PATH +$ kpops destroy [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -109,12 +109,12 @@ Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps **Usage**: ```console -$ kpops generate [OPTIONS] PIPELINE_PATH +$ kpops generate [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -152,12 +152,12 @@ In addition to generate, render final resource representation for each pipeline **Usage**: ```console -$ kpops manifest [OPTIONS] PIPELINE_PATH +$ kpops manifest [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -176,12 +176,12 @@ Reset pipeline steps **Usage**: ```console -$ kpops reset [OPTIONS] PIPELINE_PATH +$ kpops reset [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 52d86d01c..056082769 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -42,14 +42,14 @@ help="Path to the dir containing config.yaml files", ) -PIPELINE_PATH_ARG: Path = typer.Argument( +PIPELINE_PATHS_ARG: list[Path] = typer.Argument( default=..., exists=True, file_okay=True, dir_okay=True, readable=True, - envvar=f"{ENV_PREFIX}PIPELINE_PATH", - help="Path to YAML with pipeline definition", + envvar=f"{ENV_PREFIX}PIPELINE_PATHS", + help="Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'.", ) PROJECT_PATH: Path = typer.Argument( @@ -161,7 +161,7 @@ def schema( help="Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps operations (deploy, destroy, ...).", ) def generate( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -169,7 +169,7 @@ def generate( environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, ): - for pipeline_file_path in collect_pipeline_paths(pipeline_path): + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): pipeline = kpops.generate( pipeline_path=pipeline_file_path, dotenv=dotenv, @@ -187,7 +187,7 @@ def generate( help="In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests.", ) def manifest( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -195,7 +195,7 @@ def manifest( environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, ): - for pipeline_file_path in collect_pipeline_paths(pipeline_path): + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): resources = kpops.manifest( pipeline_path=pipeline_file_path, dotenv=dotenv, @@ -212,7 +212,7 @@ def manifest( @app.command(help="Deploy pipeline steps") def deploy( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -222,7 +222,7 @@ def deploy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - for pipeline_file_path in collect_pipeline_paths(pipeline_path): + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): kpops.deploy( pipeline_path=pipeline_file_path, dotenv=dotenv, @@ -238,7 +238,7 @@ def deploy( @app.command(help="Destroy pipeline steps") def destroy( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -248,7 +248,7 @@ def destroy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - for pipeline_file_path in collect_pipeline_paths(pipeline_path): + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): kpops.destroy( pipeline_path=pipeline_file_path, dotenv=dotenv, @@ -264,7 +264,7 @@ def destroy( @app.command(help="Reset pipeline steps") def reset( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -274,7 +274,7 @@ def reset( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - for pipeline_file_path in collect_pipeline_paths(pipeline_path): + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): kpops.reset( pipeline_path=pipeline_file_path, dotenv=dotenv, @@ -290,7 +290,7 @@ def reset( @app.command(help="Clean pipeline steps") def clean( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -300,7 +300,7 @@ def clean( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - for pipeline_file_path in collect_pipeline_paths(pipeline_path): + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): kpops.clean( pipeline_path=pipeline_file_path, dotenv=dotenv, diff --git a/kpops/cli/utils.py b/kpops/cli/utils.py index 986d95e95..065852d31 100644 --- a/kpops/cli/utils.py +++ b/kpops/cli/utils.py @@ -1,23 +1,24 @@ from __future__ import annotations -from collections.abc import Iterator +from collections.abc import Iterable, Iterator from pathlib import Path -def collect_pipeline_paths(pipeline_path: Path) -> Iterator[Path]: +def collect_pipeline_paths(pipeline_paths: Iterable[Path]) -> Iterator[Path]: """Generate paths to pipeline files. - :param pipeline_path: The path to the pipeline file or directory. + :param pipeline_paths: The list of paths to the pipeline files or directories. :yields: Path: Paths to pipeline files. If `pipeline_path` file yields the given path. For a directory it yields all the pipeline.yaml paths. - :raises: RuntimeError: If `pipeline_path` is neither a file nor a directory. + :raises: ValueError: If `pipeline_path` is neither a file nor a directory. """ - if pipeline_path.is_file(): - yield pipeline_path - elif pipeline_path.is_dir(): - yield from pipeline_path.glob("**/pipeline*.yaml") - else: - msg = f"The entered pipeline path '{pipeline_path}' should be a directory or file." - raise RuntimeError(msg) + for pipeline_path in pipeline_paths: + if pipeline_path.is_file(): + yield pipeline_path + elif pipeline_path.is_dir(): + yield from sorted(pipeline_path.glob("**/pipeline*.yaml")) + else: + msg = f"The entered pipeline path '{pipeline_path}' should be a directory or file." + raise ValueError(msg) diff --git a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml index 9eecaaf14..3ac0f5501 100644 --- a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml +++ b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml @@ -1,102 +1,58 @@ - _cleaner: app: - autoscaling: - consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name - cooldownPeriod: 300 - enabled: true - lagThreshold: 10000 - maxReplicas: 4 - minReplicas: 4 - offsetResetPolicy: earliest - pollingInterval: 30 - topics: - - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name commandLine: - TYPE: nothing - image: fake-registry/filter - imageTag: 2.4.1 - persistence: - enabled: false - replicaCount: 4 - resources: - requests: - memory: 3G - statefulSet: false + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * streams: brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 - config: - large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator - errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error - outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-5e568-clean - helm_release_name: resources-pipeline-folders-pipeline-3-a-l-5e568-clean - name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer-clean + helm_release_name: resources-pipeline-folders-pipeline-1-sch-7b7df-clean + name: scheduled-producer namespace: example-namespace - prefix: resources-pipeline-folders-pipeline-3- + prefix: resources-pipeline-folders-pipeline-1- repo_config: repo_auth_flags: insecure_skip_tls_verify: false repository_name: bakdata-streams-bootstrap url: https://bakdata.github.io/streams-bootstrap/ suffix: -clean - type: streams-app-cleaner + type: producer-app-cleaner version: 2.4.2 app: - autoscaling: - consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name - cooldownPeriod: 300 - enabled: true - lagThreshold: 10000 - maxReplicas: 4 - minReplicas: 4 - offsetResetPolicy: earliest - pollingInterval: 30 - topics: - - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name commandLine: - TYPE: nothing - image: fake-registry/filter - imageTag: 2.4.1 - persistence: - enabled: false - replicaCount: 4 - resources: - requests: - memory: 3G - statefulSet: false + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * streams: brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 - config: - large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator - errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error - outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-long--f9195 - helm_release_name: resources-pipeline-folders-pipeline-3-a-long-na-f9195 - name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer + helm_release_name: resources-pipeline-folders-pipeline-1-scheduled-066a8 + name: scheduled-producer namespace: example-namespace - prefix: resources-pipeline-folders-pipeline-3- + prefix: resources-pipeline-folders-pipeline-1- repo_config: repo_auth_flags: insecure_skip_tls_verify: false repository_name: bakdata-streams-bootstrap url: https://bakdata.github.io/streams-bootstrap/ to: - models: {} + models: + com/bakdata/kafka/fake: 1.0.0 topics: - ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name - : configs: - retention.ms: '-1' - partitions_count: 50 - type: output - ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error - : configs: + resources-pipeline-folders-pipeline-1-scheduled-producer: + configs: cleanup.policy: compact,delete - partitions_count: 1 - type: error - value_schema: com.bakdata.kafka.DeadLetter - type: filter + partitions_count: 12 + type: output + value_schema: com.bakdata.fake.Produced + type: scheduled-producer version: 2.4.2 - _cleaner: @@ -199,58 +155,102 @@ - _cleaner: app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name commandLine: - FAKE_ARG: fake-arg-value - image: example-registry/fake-image - imageTag: 0.0.1 - schedule: 30 3/8 * * * + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false streams: brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 - outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer-clean - helm_release_name: resources-pipeline-folders-pipeline-1-sch-7b7df-clean - name: scheduled-producer + helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-5e568-clean + helm_release_name: resources-pipeline-folders-pipeline-3-a-l-5e568-clean + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name namespace: example-namespace - prefix: resources-pipeline-folders-pipeline-1- + prefix: resources-pipeline-folders-pipeline-3- repo_config: repo_auth_flags: insecure_skip_tls_verify: false repository_name: bakdata-streams-bootstrap url: https://bakdata.github.io/streams-bootstrap/ suffix: -clean - type: producer-app-cleaner + type: streams-app-cleaner version: 2.4.2 app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name commandLine: - FAKE_ARG: fake-arg-value - image: example-registry/fake-image - imageTag: 0.0.1 - schedule: 30 3/8 * * * + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false streams: brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 - outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name schemaRegistryUrl: http://localhost:8081/ - helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer - helm_release_name: resources-pipeline-folders-pipeline-1-scheduled-066a8 - name: scheduled-producer + helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-long--f9195 + helm_release_name: resources-pipeline-folders-pipeline-3-a-long-na-f9195 + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name namespace: example-namespace - prefix: resources-pipeline-folders-pipeline-1- + prefix: resources-pipeline-folders-pipeline-3- repo_config: repo_auth_flags: insecure_skip_tls_verify: false repository_name: bakdata-streams-bootstrap url: https://bakdata.github.io/streams-bootstrap/ to: - models: - com/bakdata/kafka/fake: 1.0.0 + models: {} topics: - resources-pipeline-folders-pipeline-1-scheduled-producer: - configs: - cleanup.policy: compact,delete - partitions_count: 12 + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + : configs: + retention.ms: '-1' + partitions_count: 50 type: output - value_schema: com.bakdata.fake.Produced - type: scheduled-producer + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + : configs: + cleanup.policy: compact,delete + partitions_count: 1 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: filter version: 2.4.2 diff --git a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml new file mode 100644 index 000000000..3ac0f5501 --- /dev/null +++ b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml @@ -0,0 +1,256 @@ +- _cleaner: + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer-clean + helm_release_name: resources-pipeline-folders-pipeline-1-sch-7b7df-clean + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: producer-app-cleaner + version: 2.4.2 + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer + helm_release_name: resources-pipeline-folders-pipeline-1-scheduled-066a8 + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: + com/bakdata/kafka/fake: 1.0.0 + topics: + resources-pipeline-folders-pipeline-1-scheduled-producer: + configs: + cleanup.policy: compact,delete + partitions_count: 12 + type: output + value_schema: com.bakdata.fake.Produced + type: scheduled-producer + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-2-converter-clean + helm_release_name: resources-pipeline-folders-pipeline-2-converter-clean + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-2-converter + helm_release_name: resources-pipeline-folders-pipeline-2-converter + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + resources-pipeline-folders-pipeline-2-converter: + configs: + cleanup.policy: compact,delete + retention.ms: '-1' + partitions_count: 50 + type: output + resources-pipeline-folders-pipeline-2-converter-error: + configs: + cleanup.policy: compact,delete + partitions_count: 10 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: converter + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-5e568-clean + helm_release_name: resources-pipeline-folders-pipeline-3-a-l-5e568-clean + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-long--f9195 + helm_release_name: resources-pipeline-folders-pipeline-3-a-long-na-f9195 + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + : configs: + retention.ms: '-1' + partitions_count: 50 + type: output + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + : configs: + cleanup.policy: compact,delete + partitions_count: 1 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: filter + version: 2.4.2 + diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index ba1359c50..841d12f81 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -90,6 +90,20 @@ def test_load_pipeline_with_folder_path(self, snapshot: Snapshot): snapshot.assert_match(result.stdout, "pipeline.yaml") + def test_load_pipeline_with_multiple_pipeline_paths(self, snapshot: Snapshot): + path_1 = RESOURCE_PATH / "pipeline-folders/pipeline-1/pipeline.yaml" + path_2 = RESOURCE_PATH / "pipeline-folders/pipeline-2/pipeline.yaml" + path_3 = RESOURCE_PATH / "pipeline-folders/pipeline-3/pipeline.yaml" + result = runner.invoke( + app, + ["generate", str(path_1), str(path_2), str(path_3)], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.stdout + + snapshot.assert_match(result.stdout, "pipeline.yaml") + def test_name_equal_prefix_name_concatenation(self): result = runner.invoke( app,