Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deploy multiple pipelines #487

Merged
merged 25 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/docs/resources/variables/cli_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ KPOPS_DOTENV_PATH # No default value, not required
# Suffix your environment files with this value (e.g.
# defaults_development.yaml for environment=development).
KPOPS_ENVIRONMENT # No default value, not required
# Path to YAML with pipeline definition
KPOPS_PIPELINE_PATH # No default value, required
# Paths to dir containing 'pipeline.yaml' or files named
# 'pipeline.yaml'.
KPOPS_PIPELINE_PATHS # No default value, required
# Comma separated list of steps to apply the command on
KPOPS_PIPELINE_STEPS # No default value, not required
2 changes: 1 addition & 1 deletion docs/docs/resources/variables/cli_env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ These variables take precedence over the commands' flags. If a variable is set,
|KPOPS_CONFIG_PATH |. |False |Path to the dir containing config.yaml files |
|KPOPS_DOTENV_PATH | |False |Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. |
|KPOPS_ENVIRONMENT | |False |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).|
|KPOPS_PIPELINE_PATH | |True |Path to YAML with pipeline definition |
|KPOPS_PIPELINE_PATHS| |True |Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. |
|KPOPS_PIPELINE_STEPS| |False |Comma separated list of steps to apply the command on |
24 changes: 12 additions & 12 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ Clean pipeline steps
**Usage**:

```console
$ kpops clean [OPTIONS] PIPELINE_PATH
$ kpops clean [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand All @@ -57,12 +57,12 @@ Deploy pipeline steps
**Usage**:

```console
$ kpops deploy [OPTIONS] PIPELINE_PATH
$ kpops deploy [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand All @@ -83,12 +83,12 @@ Destroy pipeline steps
**Usage**:

```console
$ kpops destroy [OPTIONS] PIPELINE_PATH
$ kpops destroy [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand All @@ -109,12 +109,12 @@ Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps
**Usage**:

```console
$ kpops generate [OPTIONS] PIPELINE_PATH
$ kpops generate [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand Down Expand Up @@ -152,12 +152,12 @@ In addition to generate, render final resource representation for each pipeline
**Usage**:

```console
$ kpops manifest [OPTIONS] PIPELINE_PATH
$ kpops manifest [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand All @@ -176,12 +176,12 @@ Reset pipeline steps
**Usage**:

```console
$ kpops reset [OPTIONS] PIPELINE_PATH
$ kpops reset [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand Down
159 changes: 83 additions & 76 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import kpops
from kpops import __version__
from kpops.api.options import FilterType
from kpops.cli.utils import collect_pipeline_paths
from kpops.config import ENV_PREFIX, KpopsConfig
from kpops.utils.gen_schema import (
SchemaScope,
Expand Down Expand Up @@ -41,14 +42,14 @@
help="Path to the dir containing config.yaml files",
)

PIPELINE_PATH_ARG: Path = typer.Argument(
PIPELINE_PATHS_ARG: list[Path] = typer.Argument(
default=...,
exists=True,
file_okay=True,
dir_okay=False,
dir_okay=True,
readable=True,
envvar=f"{ENV_PREFIX}PIPELINE_PATH",
help="Path to YAML with pipeline definition",
envvar=f"{ENV_PREFIX}PIPELINE_PATHS",
help="Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'.",
)

PROJECT_PATH: Path = typer.Argument(
Expand Down Expand Up @@ -160,56 +161,58 @@ def schema(
help="Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps operations (deploy, destroy, ...).",
)
def generate(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
filter_type: FilterType = FILTER_TYPE,
environment: Optional[str] = ENVIRONMENT,
verbose: bool = VERBOSE_OPTION,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
print_yaml(pipeline.to_yaml())
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
pipeline = kpops.generate(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
print_yaml(pipeline.to_yaml())


@app.command(
short_help="Render final resource representation",
help="In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests.",
)
def manifest(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
filter_type: FilterType = FILTER_TYPE,
environment: Optional[str] = ENVIRONMENT,
verbose: bool = VERBOSE_OPTION,
):
resources = kpops.manifest(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
for resource in resources:
for rendered_manifest in resource:
print_yaml(rendered_manifest)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
resources = kpops.manifest(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
for resource in resources:
for rendered_manifest in resource:
print_yaml(rendered_manifest)


@app.command(help="Deploy pipeline steps")
def deploy(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
Expand All @@ -219,22 +222,23 @@ def deploy(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.deploy(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.deploy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


@app.command(help="Destroy pipeline steps")
def destroy(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
Expand All @@ -244,22 +248,23 @@ def destroy(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.destroy(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.destroy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


@app.command(help="Reset pipeline steps")
def reset(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
Expand All @@ -269,22 +274,23 @@ def reset(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.reset(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.reset(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


@app.command(help="Clean pipeline steps")
def clean(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
Expand All @@ -294,17 +300,18 @@ def clean(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.clean(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.clean(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


def version_callback(show_version: bool) -> None:
Expand Down
24 changes: 24 additions & 0 deletions kpops/cli/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from collections.abc import Iterable, Iterator
from pathlib import Path


def collect_pipeline_paths(pipeline_paths: Iterable[Path]) -> Iterator[Path]:
"""Generate paths to pipeline files.

:param pipeline_paths: The list of paths to the pipeline files or directories.

:yields: Path: Paths to pipeline files. If `pipeline_path` file yields the given path.
For a directory it yields all the pipeline.yaml paths.

:raises: ValueError: If `pipeline_path` is neither a file nor a directory.
"""
for pipeline_path in pipeline_paths:
if pipeline_path.is_file():
yield pipeline_path
elif pipeline_path.is_dir():
yield from sorted(pipeline_path.glob("**/pipeline*.yaml"))
else:
msg = f"The entered pipeline path '{pipeline_path}' should be a directory or file."
raise ValueError(msg)
2 changes: 1 addition & 1 deletion kpops/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def remove(self, component_id: str) -> None:
self._component_index.pop(component_id)

def get(self, component_id: str) -> PipelineComponent | None:
self._component_index.get(component_id)
return self._component_index.get(component_id)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

def find(self, predicate: ComponentFilterPredicate) -> Iterator[PipelineComponent]:
"""Find pipeline components matching a custom predicate.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- type: scheduled-producer
app:
commandLine:
FAKE_ARG: "fake-arg-value"
schedule: "30 3/8 * * *"
Loading
Loading