Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deploy multiple pipelines #487

Merged
merged 25 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 74 additions & 67 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import kpops
from kpops import __version__
from kpops.api.options import FilterType
from kpops.cli.utils import collect_pipeline_paths
from kpops.config import ENV_PREFIX, KpopsConfig
from kpops.utils.gen_schema import (
SchemaScope,
Expand Down Expand Up @@ -45,7 +46,7 @@
default=...,
exists=True,
file_okay=True,
dir_okay=False,
dir_okay=True,
readable=True,
envvar=f"{ENV_PREFIX}PIPELINE_PATH",
help="Path to YAML with pipeline definition",
Expand Down Expand Up @@ -168,16 +169,17 @@ def generate(
environment: Optional[str] = ENVIRONMENT,
verbose: bool = VERBOSE_OPTION,
):
pipeline = kpops.generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
print_yaml(pipeline.to_yaml())
for pipeline_file_path in collect_pipeline_paths(pipeline_path):
pipeline = kpops.generate(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
print_yaml(pipeline.to_yaml())


@app.command(
Expand All @@ -193,18 +195,19 @@ def manifest(
environment: Optional[str] = ENVIRONMENT,
verbose: bool = VERBOSE_OPTION,
):
resources = kpops.manifest(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
for resource in resources:
for rendered_manifest in resource:
print_yaml(rendered_manifest)
for pipeline_file_path in collect_pipeline_paths(pipeline_path):
resources = kpops.manifest(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
verbose=verbose,
)
for resource in resources:
for rendered_manifest in resource:
print_yaml(rendered_manifest)


@app.command(help="Deploy pipeline steps")
Expand All @@ -219,17 +222,18 @@ def deploy(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.deploy(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_path):
kpops.deploy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


@app.command(help="Destroy pipeline steps")
Expand All @@ -244,17 +248,18 @@ def destroy(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.destroy(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_path):
kpops.destroy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


@app.command(help="Reset pipeline steps")
Expand All @@ -269,17 +274,18 @@ def reset(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.reset(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_path):
kpops.reset(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


@app.command(help="Clean pipeline steps")
Expand All @@ -294,17 +300,18 @@ def clean(
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
):
kpops.clean(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
for pipeline_file_path in collect_pipeline_paths(pipeline_path):
kpops.clean(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)


def version_callback(show_version: bool) -> None:
Expand Down
23 changes: 23 additions & 0 deletions kpops/cli/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from __future__ import annotations

from collections.abc import Iterator
from pathlib import Path


def collect_pipeline_paths(pipeline_path: Path) -> Iterator[Path]:
"""Generate paths to pipeline files.

:param pipeline_path: The path to the pipeline file or directory.

:yields: Path: Paths to pipeline files. If `pipeline_path` file yields the given path.
For a directory it yields all the pipeline.yaml paths.

:raises: RuntimeError: If `pipeline_path` is neither a file nor a directory.
"""
if pipeline_path.is_file():
yield pipeline_path
elif pipeline_path.is_dir():
yield from pipeline_path.glob("**/pipeline*.yaml")
else:
msg = f"The entered pipeline path '{pipeline_path}' should be a directory or file."
raise RuntimeError(msg)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion kpops/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def remove(self, component_id: str) -> None:
self._component_index.pop(component_id)

def get(self, component_id: str) -> PipelineComponent | None:
self._component_index.get(component_id)
return self._component_index.get(component_id)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

def find(self, predicate: ComponentFilterPredicate) -> Iterator[PipelineComponent]:
"""Find pipeline components matching a custom predicate.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- type: scheduled-producer
app:
commandLine:
FAKE_ARG: "fake-arg-value"
schedule: "30 3/8 * * *"
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- type: converter
app:
commandLine:
CONVERT_XML: true
resources:
limits:
memory: 2G
requests:
memory: 2G
12 changes: 12 additions & 0 deletions tests/pipeline/resources/pipeline-folders/pipeline-3/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
- type: filter
name: "a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name-a-long-name"
app:
commandLine:
TYPE: "nothing"
resources:
requests:
memory: 3G
replicaCount: 4
autoscaling:
minReplicas: 4
maxReplicas: 4
Loading
Loading