diff --git a/docs/docs/resources/variables/cli_env_vars.env b/docs/docs/resources/variables/cli_env_vars.env index c51e2ee85..21436ded7 100644 --- a/docs/docs/resources/variables/cli_env_vars.env +++ b/docs/docs/resources/variables/cli_env_vars.env @@ -14,7 +14,8 @@ KPOPS_DOTENV_PATH # No default value, not required # Suffix your environment files with this value (e.g. # defaults_development.yaml for environment=development). KPOPS_ENVIRONMENT # No default value, not required -# Path to YAML with pipeline definition -KPOPS_PIPELINE_PATH # No default value, required +# Paths to dir containing 'pipeline.yaml' or files named +# 'pipeline.yaml'. +KPOPS_PIPELINE_PATHS # No default value, required # Comma separated list of steps to apply the command on KPOPS_PIPELINE_STEPS # No default value, not required diff --git a/docs/docs/resources/variables/cli_env_vars.md b/docs/docs/resources/variables/cli_env_vars.md index 87f8d2b2c..da6a2d994 100644 --- a/docs/docs/resources/variables/cli_env_vars.md +++ b/docs/docs/resources/variables/cli_env_vars.md @@ -5,5 +5,5 @@ These variables take precedence over the commands' flags. If a variable is set, |KPOPS_CONFIG_PATH |. |False |Path to the dir containing config.yaml files | |KPOPS_DOTENV_PATH | |False |Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. | |KPOPS_ENVIRONMENT | |False |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).| -|KPOPS_PIPELINE_PATH | |True |Path to YAML with pipeline definition | +|KPOPS_PIPELINE_PATHS| |True |Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. | |KPOPS_PIPELINE_STEPS| |False |Comma separated list of steps to apply the command on | diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 62d940123..570563069 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -31,12 +31,12 @@ Clean pipeline steps **Usage**: ```console -$ kpops clean [OPTIONS] PIPELINE_PATH +$ kpops clean [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -57,12 +57,12 @@ Deploy pipeline steps **Usage**: ```console -$ kpops deploy [OPTIONS] PIPELINE_PATH +$ kpops deploy [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -83,12 +83,12 @@ Destroy pipeline steps **Usage**: ```console -$ kpops destroy [OPTIONS] PIPELINE_PATH +$ kpops destroy [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -109,12 +109,12 @@ Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps **Usage**: ```console -$ kpops generate [OPTIONS] PIPELINE_PATH +$ kpops generate [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -152,12 +152,12 @@ In addition to generate, render final resource representation for each pipeline **Usage**: ```console -$ kpops manifest [OPTIONS] PIPELINE_PATH +$ kpops manifest [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: @@ -176,12 +176,12 @@ Reset pipeline steps **Usage**: ```console -$ kpops reset [OPTIONS] PIPELINE_PATH +$ kpops reset [OPTIONS] PIPELINE_PATHS... ``` **Arguments**: -* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required] +* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required] **Options**: diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 3a3c17471..056082769 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -8,6 +8,7 @@ import kpops from kpops import __version__ from kpops.api.options import FilterType +from kpops.cli.utils import collect_pipeline_paths from kpops.config import ENV_PREFIX, KpopsConfig from kpops.utils.gen_schema import ( SchemaScope, @@ -41,14 +42,14 @@ help="Path to the dir containing config.yaml files", ) -PIPELINE_PATH_ARG: Path = typer.Argument( +PIPELINE_PATHS_ARG: list[Path] = typer.Argument( default=..., exists=True, file_okay=True, - dir_okay=False, + dir_okay=True, readable=True, - envvar=f"{ENV_PREFIX}PIPELINE_PATH", - help="Path to YAML with pipeline definition", + envvar=f"{ENV_PREFIX}PIPELINE_PATHS", + help="Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'.", ) PROJECT_PATH: Path = typer.Argument( @@ -160,7 +161,7 @@ def schema( help="Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps operations (deploy, destroy, ...).", ) def generate( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -168,16 +169,17 @@ def generate( environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, ): - pipeline = kpops.generate( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=parse_steps(steps), - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - print_yaml(pipeline.to_yaml()) + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + pipeline = kpops.generate( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + print_yaml(pipeline.to_yaml()) @app.command( @@ -185,7 +187,7 @@ def generate( help="In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests.", ) def manifest( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -193,23 +195,24 @@ def manifest( environment: Optional[str] = ENVIRONMENT, verbose: bool = VERBOSE_OPTION, ): - resources = kpops.manifest( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=parse_steps(steps), - filter_type=filter_type, - environment=environment, - verbose=verbose, - ) - for resource in resources: - for rendered_manifest in resource: - print_yaml(rendered_manifest) + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + resources = kpops.manifest( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + verbose=verbose, + ) + for resource in resources: + for rendered_manifest in resource: + print_yaml(rendered_manifest) @app.command(help="Deploy pipeline steps") def deploy( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -219,22 +222,23 @@ def deploy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - kpops.deploy( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=parse_steps(steps), - filter_type=filter_type, - environment=environment, - dry_run=dry_run, - verbose=verbose, - parallel=parallel, - ) + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + kpops.deploy( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + dry_run=dry_run, + verbose=verbose, + parallel=parallel, + ) @app.command(help="Destroy pipeline steps") def destroy( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -244,22 +248,23 @@ def destroy( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - kpops.destroy( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=parse_steps(steps), - filter_type=filter_type, - environment=environment, - dry_run=dry_run, - verbose=verbose, - parallel=parallel, - ) + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + kpops.destroy( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + dry_run=dry_run, + verbose=verbose, + parallel=parallel, + ) @app.command(help="Reset pipeline steps") def reset( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -269,22 +274,23 @@ def reset( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - kpops.reset( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=parse_steps(steps), - filter_type=filter_type, - environment=environment, - dry_run=dry_run, - verbose=verbose, - parallel=parallel, - ) + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + kpops.reset( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + dry_run=dry_run, + verbose=verbose, + parallel=parallel, + ) @app.command(help="Clean pipeline steps") def clean( - pipeline_path: Path = PIPELINE_PATH_ARG, + pipeline_paths: list[Path] = PIPELINE_PATHS_ARG, dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION, config: Path = CONFIG_PATH_OPTION, steps: Optional[str] = PIPELINE_STEPS, @@ -294,17 +300,18 @@ def clean( verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, ): - kpops.clean( - pipeline_path=pipeline_path, - dotenv=dotenv, - config=config, - steps=parse_steps(steps), - filter_type=filter_type, - environment=environment, - dry_run=dry_run, - verbose=verbose, - parallel=parallel, - ) + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + kpops.clean( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + dry_run=dry_run, + verbose=verbose, + parallel=parallel, + ) def version_callback(show_version: bool) -> None: diff --git a/kpops/cli/utils.py b/kpops/cli/utils.py new file mode 100644 index 000000000..065852d31 --- /dev/null +++ b/kpops/cli/utils.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from collections.abc import Iterable, Iterator +from pathlib import Path + + +def collect_pipeline_paths(pipeline_paths: Iterable[Path]) -> Iterator[Path]: + """Generate paths to pipeline files. + + :param pipeline_paths: The list of paths to the pipeline files or directories. + + :yields: Path: Paths to pipeline files. If `pipeline_path` file yields the given path. + For a directory it yields all the pipeline.yaml paths. + + :raises: ValueError: If `pipeline_path` is neither a file nor a directory. + """ + for pipeline_path in pipeline_paths: + if pipeline_path.is_file(): + yield pipeline_path + elif pipeline_path.is_dir(): + yield from sorted(pipeline_path.glob("**/pipeline*.yaml")) + else: + msg = f"The entered pipeline path '{pipeline_path}' should be a directory or file." + raise ValueError(msg) diff --git a/kpops/pipeline.py b/kpops/pipeline.py index 5545949c7..f0b1aa6b8 100644 --- a/kpops/pipeline.py +++ b/kpops/pipeline.py @@ -68,7 +68,7 @@ def remove(self, component_id: str) -> None: self._component_index.pop(component_id) def get(self, component_id: str) -> PipelineComponent | None: - self._component_index.get(component_id) + return self._component_index.get(component_id) def find(self, predicate: ComponentFilterPredicate) -> Iterator[PipelineComponent]: """Find pipeline components matching a custom predicate. diff --git a/tests/pipeline/resources/pipeline-folders/pipeline-1/pipeline.yaml b/tests/pipeline/resources/pipeline-folders/pipeline-1/pipeline.yaml new file mode 100644 index 000000000..503665e9a --- /dev/null +++ b/tests/pipeline/resources/pipeline-folders/pipeline-1/pipeline.yaml @@ -0,0 +1,5 @@ +- type: scheduled-producer + app: + commandLine: + FAKE_ARG: "fake-arg-value" + schedule: "30 3/8 * * *" diff --git a/tests/pipeline/resources/pipeline-folders/pipeline-2/pipeline.yaml b/tests/pipeline/resources/pipeline-folders/pipeline-2/pipeline.yaml new file mode 100644 index 000000000..0dfc1da57 --- /dev/null +++ b/tests/pipeline/resources/pipeline-folders/pipeline-2/pipeline.yaml @@ -0,0 +1,9 @@ +- type: converter + app: + commandLine: + CONVERT_XML: true + resources: + limits: + memory: 2G + requests: + memory: 2G diff --git a/tests/pipeline/resources/pipeline-folders/pipeline-3/pipeline.yaml b/tests/pipeline/resources/pipeline-folders/pipeline-3/pipeline.yaml new file mode 100644 index 000000000..99571e5e6 --- /dev/null +++ b/tests/pipeline/resources/pipeline-folders/pipeline-3/pipeline.yaml @@ -0,0 +1,12 @@ +- type: filter + name: "a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name" + app: + commandLine: + TYPE: "nothing" + resources: + requests: + memory: 3G + replicaCount: 4 + autoscaling: + minReplicas: 4 + maxReplicas: 4 diff --git a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml new file mode 100644 index 000000000..3ac0f5501 --- /dev/null +++ b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_folder_path/pipeline.yaml @@ -0,0 +1,256 @@ +- _cleaner: + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer-clean + helm_release_name: resources-pipeline-folders-pipeline-1-sch-7b7df-clean + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: producer-app-cleaner + version: 2.4.2 + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer + helm_release_name: resources-pipeline-folders-pipeline-1-scheduled-066a8 + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: + com/bakdata/kafka/fake: 1.0.0 + topics: + resources-pipeline-folders-pipeline-1-scheduled-producer: + configs: + cleanup.policy: compact,delete + partitions_count: 12 + type: output + value_schema: com.bakdata.fake.Produced + type: scheduled-producer + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-2-converter-clean + helm_release_name: resources-pipeline-folders-pipeline-2-converter-clean + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-2-converter + helm_release_name: resources-pipeline-folders-pipeline-2-converter + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + resources-pipeline-folders-pipeline-2-converter: + configs: + cleanup.policy: compact,delete + retention.ms: '-1' + partitions_count: 50 + type: output + resources-pipeline-folders-pipeline-2-converter-error: + configs: + cleanup.policy: compact,delete + partitions_count: 10 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: converter + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-5e568-clean + helm_release_name: resources-pipeline-folders-pipeline-3-a-l-5e568-clean + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-long--f9195 + helm_release_name: resources-pipeline-folders-pipeline-3-a-long-na-f9195 + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + : configs: + retention.ms: '-1' + partitions_count: 50 + type: output + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + : configs: + cleanup.policy: compact,delete + partitions_count: 1 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: filter + version: 2.4.2 + diff --git a/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml new file mode 100644 index 000000000..3ac0f5501 --- /dev/null +++ b/tests/pipeline/snapshots/test_generate/test_load_pipeline_with_multiple_pipeline_paths/pipeline.yaml @@ -0,0 +1,256 @@ +- _cleaner: + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer-clean + helm_release_name: resources-pipeline-folders-pipeline-1-sch-7b7df-clean + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: producer-app-cleaner + version: 2.4.2 + app: + commandLine: + FAKE_ARG: fake-arg-value + image: example-registry/fake-image + imageTag: 0.0.1 + schedule: 30 3/8 * * * + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + outputTopic: resources-pipeline-folders-pipeline-1-scheduled-producer + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-1-scheduled-producer + helm_release_name: resources-pipeline-folders-pipeline-1-scheduled-066a8 + name: scheduled-producer + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-1- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: + com/bakdata/kafka/fake: 1.0.0 + topics: + resources-pipeline-folders-pipeline-1-scheduled-producer: + configs: + cleanup.policy: compact,delete + partitions_count: 12 + type: output + value_schema: com.bakdata.fake.Produced + type: scheduled-producer + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-2-converter-clean + helm_release_name: resources-pipeline-folders-pipeline-2-converter-clean + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: converter-resources-pipeline-folders-pipeline-2-converter + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 1 + minReplicas: 0 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: [] + commandLine: + CONVERT_XML: true + persistence: + enabled: false + resources: + limits: + memory: 2G + requests: + memory: 2G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-2-converter-error + outputTopic: resources-pipeline-folders-pipeline-2-converter + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-2-converter + helm_release_name: resources-pipeline-folders-pipeline-2-converter + name: converter + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-2- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + resources-pipeline-folders-pipeline-2-converter: + configs: + cleanup.policy: compact,delete + retention.ms: '-1' + partitions_count: 50 + type: output + resources-pipeline-folders-pipeline-2-converter-error: + configs: + cleanup.policy: compact,delete + partitions_count: 10 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: converter + version: 2.4.2 + +- _cleaner: + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-5e568-clean + helm_release_name: resources-pipeline-folders-pipeline-3-a-l-5e568-clean + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + suffix: -clean + type: streams-app-cleaner + version: 2.4.2 + app: + autoscaling: + consumerGroup: filter-resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + cooldownPeriod: 300 + enabled: true + lagThreshold: 10000 + maxReplicas: 4 + minReplicas: 4 + offsetResetPolicy: earliest + pollingInterval: 30 + topics: + - resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + commandLine: + TYPE: nothing + image: fake-registry/filter + imageTag: 2.4.1 + persistence: + enabled: false + replicaCount: 4 + resources: + requests: + memory: 3G + statefulSet: false + streams: + brokers: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + config: + large.message.id.generator: com.bakdata.kafka.MurmurHashIdGenerator + errorTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + outputTopic: resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + schemaRegistryUrl: http://localhost:8081/ + helm_name_override: resources-pipeline-folders-pipeline-3-a-long-name-a-long--f9195 + helm_release_name: resources-pipeline-folders-pipeline-3-a-long-na-f9195 + name: a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + namespace: example-namespace + prefix: resources-pipeline-folders-pipeline-3- + repo_config: + repo_auth_flags: + insecure_skip_tls_verify: false + repository_name: bakdata-streams-bootstrap + url: https://bakdata.github.io/streams-bootstrap/ + to: + models: {} + topics: + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name + : configs: + retention.ms: '-1' + partitions_count: 50 + type: output + ? resources-pipeline-folders-pipeline-3-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-error + : configs: + cleanup.policy: compact,delete + partitions_count: 1 + type: error + value_schema: com.bakdata.kafka.DeadLetter + type: filter + version: 2.4.2 + diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 0aa50fe3e..841d12f81 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -76,6 +76,34 @@ def test_load_pipeline(self, snapshot: Snapshot): snapshot.assert_match(result.stdout, "pipeline.yaml") + def test_load_pipeline_with_folder_path(self, snapshot: Snapshot): + result = runner.invoke( + app, + [ + "generate", + str(RESOURCE_PATH / "pipeline-folders"), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.stdout + + snapshot.assert_match(result.stdout, "pipeline.yaml") + + def test_load_pipeline_with_multiple_pipeline_paths(self, snapshot: Snapshot): + path_1 = RESOURCE_PATH / "pipeline-folders/pipeline-1/pipeline.yaml" + path_2 = RESOURCE_PATH / "pipeline-folders/pipeline-2/pipeline.yaml" + path_3 = RESOURCE_PATH / "pipeline-folders/pipeline-3/pipeline.yaml" + result = runner.invoke( + app, + ["generate", str(path_1), str(path_2), str(path_3)], + catch_exceptions=False, + ) + + assert result.exit_code == 0, result.stdout + + snapshot.assert_match(result.stdout, "pipeline.yaml") + def test_name_equal_prefix_name_concatenation(self): result = runner.invoke( app,