Skip to content

Commit

Permalink
Merge branch 'v6' into refactor/kpops-resouces
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf authored May 27, 2024
2 parents 5f646f5 + 8773f62 commit b247a85
Show file tree
Hide file tree
Showing 12 changed files with 690 additions and 92 deletions.
5 changes: 3 additions & 2 deletions docs/docs/resources/variables/cli_env_vars.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ KPOPS_DOTENV_PATH # No default value, not required
# Suffix your environment files with this value (e.g.
# defaults_development.yaml for environment=development).
KPOPS_ENVIRONMENT # No default value, not required
# Path to YAML with pipeline definition
KPOPS_PIPELINE_PATH # No default value, required
# Paths to dir containing 'pipeline.yaml' or files named
# 'pipeline.yaml'.
KPOPS_PIPELINE_PATHS # No default value, required
# Comma separated list of steps to apply the command on
KPOPS_PIPELINE_STEPS # No default value, not required
2 changes: 1 addition & 1 deletion docs/docs/resources/variables/cli_env_vars.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ These variables take precedence over the commands' flags. If a variable is set,
|KPOPS_CONFIG_PATH |. |False |Path to the dir containing config.yaml files |
|KPOPS_DOTENV_PATH | |False |Path to dotenv file. Multiple files can be provided. The files will be loaded in order, with each file overriding the previous one. |
|KPOPS_ENVIRONMENT | |False |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).|
|KPOPS_PIPELINE_PATH | |True |Path to YAML with pipeline definition |
|KPOPS_PIPELINE_PATHS| |True |Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. |
|KPOPS_PIPELINE_STEPS| |False |Comma separated list of steps to apply the command on |
24 changes: 12 additions & 12 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ Clean pipeline steps
**Usage**:

```console
$ kpops clean [OPTIONS] PIPELINE_PATH
$ kpops clean [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand All @@ -57,12 +57,12 @@ Deploy pipeline steps
**Usage**:

```console
$ kpops deploy [OPTIONS] PIPELINE_PATH
$ kpops deploy [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand All @@ -83,12 +83,12 @@ Destroy pipeline steps
**Usage**:

```console
$ kpops destroy [OPTIONS] PIPELINE_PATH
$ kpops destroy [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand All @@ -109,12 +109,12 @@ Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps
**Usage**:

```console
$ kpops generate [OPTIONS] PIPELINE_PATH
$ kpops generate [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand Down Expand Up @@ -152,12 +152,12 @@ In addition to generate, render final resource representation for each pipeline
**Usage**:

```console
$ kpops manifest [OPTIONS] PIPELINE_PATH
$ kpops manifest [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand All @@ -176,12 +176,12 @@ Reset pipeline steps
**Usage**:

```console
$ kpops reset [OPTIONS] PIPELINE_PATH
$ kpops reset [OPTIONS] PIPELINE_PATHS...
```

**Arguments**:

* `PIPELINE_PATH`: Path to YAML with pipeline definition [env var: KPOPS_PIPELINE_PATH;required]
* `PIPELINE_PATHS...`: Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'. [env var: KPOPS_PIPELINE_PATHS;required]

**Options**:

Expand Down
159 changes: 83 additions & 76 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from kpops import __version__
from kpops.api.file_type import KpopsFileType
from kpops.api.options import FilterType
from kpops.cli.utils import collect_pipeline_paths
from kpops.config import ENV_PREFIX, KpopsConfig
from kpops.utils.gen_schema import (
gen_config_schema,
Expand Down Expand Up @@ -41,14 +42,14 @@
help="Path to the dir containing config.yaml files",
)

PIPELINE_PATH_ARG: Path = typer.Argument(
PIPELINE_PATHS_ARG: list[Path] = typer.Argument(
default=...,
exists=True,
file_okay=True,
dir_okay=False,
dir_okay=True,
readable=True,
envvar=f"{ENV_PREFIX}PIPELINE_PATH",
help="Path to YAML with pipeline definition",
envvar=f"{ENV_PREFIX}PIPELINE_PATHS",
help="Paths to dir containing 'pipeline.yaml' or files named 'pipeline.yaml'.",
)

PROJECT_PATH: Path = typer.Argument(
Expand Down Expand Up @@ -160,56 +161,58 @@ def schema(
help="Enrich pipeline steps with defaults. The enriched pipeline is used for all KPOps operations (deploy, destroy, ...).",
)
def generate(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
filter_type: FilterType = FILTER_TYPE,
environment: Optional[str] = ENVIRONMENT,
verbose: bool = VERBOSE_OPTION,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
print_yaml(pipeline.to_yaml())
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
pipeline = kpops.generate(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
print_yaml(pipeline.to_yaml())


@app.command(
short_help="Render final resource representation",
help="In addition to generate, render final resource representation for each pipeline step, e.g. Kubernetes manifests.",
)
def manifest(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
filter_type: FilterType = FILTER_TYPE,
environment: Optional[str] = ENVIRONMENT,
verbose: bool = VERBOSE_OPTION,
):
resources = kpops.manifest(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
for resource in resources:
for rendered_manifest in resource:
print_yaml(rendered_manifest)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
resources = kpops.manifest(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
for resource in resources:
for rendered_manifest in resource:
print_yaml(rendered_manifest)


@app.command(help="Deploy pipeline steps")
def deploy(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
Expand All @@ -219,22 +222,23 @@ def deploy(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.deploy(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.deploy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


@app.command(help="Destroy pipeline steps")
def destroy(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
Expand All @@ -244,22 +248,23 @@ def destroy(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.destroy(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.destroy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


@app.command(help="Reset pipeline steps")
def reset(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
Expand All @@ -269,22 +274,23 @@ def reset(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.reset(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.reset(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


@app.command(help="Clean pipeline steps")
def clean(
pipeline_path: Path = PIPELINE_PATH_ARG,
pipeline_paths: list[Path] = PIPELINE_PATHS_ARG,
dotenv: Optional[list[Path]] = DOTENV_PATH_OPTION,
config: Path = CONFIG_PATH_OPTION,
steps: Optional[str] = PIPELINE_STEPS,
Expand All @@ -294,17 +300,18 @@ def clean(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.clean(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.clean(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


def version_callback(show_version: bool) -> None:
Expand Down
24 changes: 24 additions & 0 deletions kpops/cli/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from collections.abc import Iterable, Iterator
from pathlib import Path


def collect_pipeline_paths(pipeline_paths: Iterable[Path]) -> Iterator[Path]:
"""Generate paths to pipeline files.
:param pipeline_paths: The list of paths to the pipeline files or directories.
:yields: Path: Paths to pipeline files. If `pipeline_path` file yields the given path.
For a directory it yields all the pipeline.yaml paths.
:raises: ValueError: If `pipeline_path` is neither a file nor a directory.
"""
for pipeline_path in pipeline_paths:
if pipeline_path.is_file():
yield pipeline_path
elif pipeline_path.is_dir():
yield from sorted(pipeline_path.glob("**/pipeline*.yaml"))
else:
msg = f"The entered pipeline path '{pipeline_path}' should be a directory or file."
raise ValueError(msg)
2 changes: 1 addition & 1 deletion kpops/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def remove(self, component_id: str) -> None:
self._component_index.pop(component_id)

def get(self, component_id: str) -> PipelineComponent | None:
self._component_index.get(component_id)
return self._component_index.get(component_id)

def find(self, predicate: ComponentFilterPredicate) -> Iterator[PipelineComponent]:
"""Find pipeline components matching a custom predicate.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- type: scheduled-producer
app:
commandLine:
FAKE_ARG: "fake-arg-value"
schedule: "30 3/8 * * *"
Loading

0 comments on commit b247a85

Please sign in to comment.