From 74dd373507c295e5365420b5d90b8a710e7c2603 Mon Sep 17 00:00:00 2001 From: Ivan Yordanov Date: Mon, 9 Oct 2023 10:58:23 +0300 Subject: [PATCH 1/4] Introduce ruff (#363) Replaces `isort`, `pyupgrade`, `flake8` with `ruff` --- .github/ruff-matcher.json | 17 ++++ .github/scripts/__init__.py | 0 .github/workflows/ci.yaml | 20 ++-- .pre-commit-config.yaml | 31 ++---- README.md | 1 + docs/docs/schema/config.json | 2 +- docs/docs/schema/pipeline.json | 44 ++++----- hooks/__init__.py | 2 +- hooks/gen_docs/__init__.py | 2 +- hooks/gen_schema.py | 4 +- kpops/cli/main.py | 8 +- kpops/cli/registry.py | 27 ++--- kpops/component_handlers/__init__.py | 7 +- .../helm_wrapper/dry_run_handler.py | 2 +- kpops/component_handlers/helm_wrapper/helm.py | 44 +++++---- .../helm_wrapper/helm_diff.py | 5 +- .../component_handlers/helm_wrapper/model.py | 17 ++-- .../component_handlers/helm_wrapper/utils.py | 6 +- .../kafka_connect/connect_wrapper.py | 49 ++++++---- .../kafka_connect/kafka_connect_handler.py | 27 ++--- .../component_handlers/kafka_connect/model.py | 5 +- .../kafka_connect/timeout.py | 14 +-- .../schema_handler/schema_handler.py | 25 +++-- .../schema_handler/schema_provider.py | 5 +- kpops/component_handlers/topic/handler.py | 18 ++-- .../component_handlers/topic/proxy_wrapper.py | 81 +++++++++------ kpops/component_handlers/utils/exception.py | 6 +- .../base_defaults_component.py | 19 ++-- kpops/components/base_components/kafka_app.py | 14 +-- .../base_components/kafka_connector.py | 39 ++++---- .../base_components/kubernetes_app.py | 26 ++--- .../base_components/models/from_section.py | 11 ++- .../base_components/models/to_section.py | 11 ++- .../base_components/pipeline_component.py | 39 ++++---- .../streams_bootstrap/producer/model.py | 4 +- .../producer/producer_app.py | 7 +- .../streams_bootstrap/streams/model.py | 13 +-- .../streams_bootstrap/streams/streams_app.py | 6 +- kpops/pipeline_generator/pipeline.py | 75 +++++++------- kpops/utils/dict_differ.py | 18 ++-- kpops/utils/dict_ops.py | 20 ++-- kpops/utils/docstring.py | 6 +- kpops/utils/environment.py | 2 +- kpops/utils/gen_schema.py | 22 +++-- kpops/utils/yaml_loading.py | 7 +- poetry.lock | 96 ++++++------------ pyproject.toml | 98 ++++++++++++++++++- setup.cfg | 19 ---- .../snapshots/snap_test_schema_generation.py | 16 +-- tests/cli/test_schema_generation.py | 13 +-- tests/compiler/test_pipeline_name.py | 42 ++++---- .../helm_wrapper/test_dry_run_handler.py | 4 +- .../helm_wrapper/test_helm_wrapper.py | 11 +-- .../kafka_connect/test_connect_handler.py | 10 +- .../kafka_connect/test_connect_wrapper.py | 8 +- .../schema_handler/test_schema_handler.py | 56 ++++++----- .../topic/test_proxy_wrapper.py | 17 ++-- .../topic/test_topic_handler.py | 35 +++---- .../test_base_defaults_component.py | 4 +- tests/components/test_kafka_app.py | 4 +- tests/components/test_kafka_connector.py | 14 +-- tests/components/test_kafka_sink_connector.py | 4 +- .../components/test_kafka_source_connector.py | 2 +- tests/components/test_kubernetes_app.py | 18 ++-- tests/components/test_producer_app.py | 6 +- tests/components/test_streams_app.py | 14 ++- tests/pipeline/test_components/components.py | 5 +- tests/pipeline/test_pipeline.py | 3 +- tests/pipeline/test_template.py | 2 +- tests/utils/test_environment.py | 4 +- 70 files changed, 702 insertions(+), 611 deletions(-) create mode 100644 .github/ruff-matcher.json create mode 100644 .github/scripts/__init__.py delete mode 100644 setup.cfg diff --git a/.github/ruff-matcher.json b/.github/ruff-matcher.json new file mode 100644 index 000000000..bc3b10738 --- /dev/null +++ b/.github/ruff-matcher.json @@ -0,0 +1,17 @@ +{ + "problemMatcher": [ + { + "owner": "ruff", + "pattern": [ + { + "regexp": "^(.*):(\\d+):(\\d+):\\s([\\da-zA-Z]+)\\s(.*)$", + "file": 1, + "line": 2, + "column": 3, + "code": 4, + "message": 5 + } + ] + } + ] +} diff --git a/.github/scripts/__init__.py b/.github/scripts/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index baa091133..fe87e8436 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -35,11 +35,16 @@ jobs: - name: Install dependencies run: poetry install --no-interaction - - name: Lint (flake8) - run: poetry run pre-commit run flake8 --all-files --show-diff-on-failure - - - name: Order of imports (isort) - run: poetry run pre-commit run isort --all-files --show-diff-on-failure + - name: Lint (ruff) + shell: bash + run: | + if [[ "$RUNNER_OS" == "Linux" && "${{ matrix.python-version }}" == "3.10" ]] + then + echo "::add-matcher::.github/ruff-matcher.json" + poetry run ruff check . --config pyproject.toml --output-format text --no-fix + else + poetry run pre-commit run ruff --all-files --show-diff-on-failure + fi; - name: Formatting (black) run: poetry run pre-commit run black --all-files --show-diff-on-failure @@ -59,11 +64,6 @@ jobs: - name: Generate pipeline definitions run: poetry run pre-commit run gen-docs-components --all-files --show-diff-on-failure - # TODO: enable when PEP 604 incompatibilty is in typer is resolved https://github.com/tiangolo/typer/issues/348 - # See https://github.com/tiangolo/typer/pull/522 - # - name: Syntax (pyupgrade) - # run: poetry run pre-commit run --hook-stage manual pyupgrade --all-files - - name: Test run: poetry run pytest tests diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 59639ac35..8c709b20a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,12 +1,14 @@ repos: - - repo: https://github.com/pycqa/isort - rev: 5.12.0 - hooks: - - id: isort - args: ["--settings", "setup.cfg"] - exclude: ^tests/.*snapshots/ - repo: local hooks: + - id: ruff + name: ruff + entry: ruff check . + args: [ --config, pyproject.toml, --fix, --show-fixes, --exit-non-zero-on-fix ] + language: system + types_or: [python] + require_serial: true # run once for all files + pass_filenames: false - id: black name: black entry: black @@ -14,14 +16,6 @@ repos: types_or: [python, pyi] require_serial: true # run once for all files exclude: ^tests/.*snapshots/ - - repo: https://github.com/pycqa/flake8 - rev: 5.0.4 - hooks: - - id: flake8 - args: ["--config", "setup.cfg"] - exclude: ^tests/.*snapshots/ - - repo: local - hooks: - id: pyright name: pyright entry: pyright @@ -29,15 +23,6 @@ repos: types: [python] require_serial: true # run once for all files exclude: ^tests/.*snapshots/ - - repo: https://github.com/asottile/pyupgrade - rev: v3.1.0 - hooks: - - id: pyupgrade - stages: [manual] - args: ["--py310-plus"] - exclude: ^tests/.*snapshots/ - - repo: local - hooks: - id: gen-schema name: gen-schema entry: python hooks/gen_schema.py diff --git a/README.md b/README.md index 9d2aaca2e..9dd25fd9c 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ [![Build status](https://github.com/bakdata/kpops/actions/workflows/ci.yaml/badge.svg)](https://github.com/bakdata/kpops/actions/workflows/ci.yaml) [![pypi](https://img.shields.io/pypi/v/kpops.svg)](https://pypi.org/project/kpops) [![versions](https://img.shields.io/pypi/pyversions/kpops.svg)](https://github.com/bakdata/kpops) +[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![license](https://img.shields.io/github/license/bakdata/kpops.svg)](https://github.com/bakdata/kpops/blob/main/LICENSE) ## Key features diff --git a/docs/docs/schema/config.json b/docs/docs/schema/config.json index b77b4e850..a2f18eb6b 100644 --- a/docs/docs/schema/config.json +++ b/docs/docs/schema/config.json @@ -2,7 +2,7 @@ "$ref": "#/definitions/PipelineConfig", "definitions": { "HelmConfig": { - "description": "Global Helm configuration", + "description": "Global Helm configuration.", "properties": { "api_version": { "description": "Kubernetes API version used for Capabilities.APIVersions", diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 9695ec9a2..7e77b0ddd 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -2,7 +2,7 @@ "definitions": { "FromSection": { "additionalProperties": false, - "description": "Holds multiple input topics", + "description": "Holds multiple input topics.", "properties": { "components": { "additionalProperties": { @@ -28,7 +28,7 @@ }, "FromTopic": { "additionalProperties": false, - "description": "Input topic", + "description": "Input topic.", "properties": { "role": { "description": "Custom identifier belonging to a topic; define only if `type` is `pattern` or `None`", @@ -48,7 +48,7 @@ "type": "object" }, "HelmRepoConfig": { - "description": "Helm repository configuration", + "description": "Helm repository configuration.", "properties": { "repo_auth_flags": { "allOf": [ @@ -85,7 +85,7 @@ "type": "object" }, "InputTopicTypes": { - "description": "Input topic types\n\nINPUT (input topic), PATTERN (extra-topic-pattern or input-topic-pattern)", + "description": "Input topic types.\n\nINPUT (input topic), PATTERN (extra-topic-pattern or input-topic-pattern)", "enum": [ "input", "pattern" @@ -97,7 +97,7 @@ "additionalProperties": { "type": "string" }, - "description": "Settings specific to Kafka Connectors", + "description": "Settings specific to Kafka Connectors.", "properties": { "connector.class": { "title": "Connector.Class", @@ -111,7 +111,7 @@ "type": "object" }, "KafkaSinkConnector": { - "description": "Kafka sink connector model", + "description": "Kafka sink connector model.", "properties": { "app": { "allOf": [ @@ -183,7 +183,7 @@ }, "type": { "default": "kafka-sink-connector", - "description": "Kafka sink connector model", + "description": "Kafka sink connector model.", "enum": [ "kafka-sink-connector" ], @@ -206,7 +206,7 @@ "type": "object" }, "KafkaSourceConnector": { - "description": "Kafka source connector model", + "description": "Kafka source connector model.", "properties": { "app": { "allOf": [ @@ -283,7 +283,7 @@ }, "type": { "default": "kafka-source-connector", - "description": "Kafka source connector model", + "description": "Kafka source connector model.", "enum": [ "kafka-source-connector" ], @@ -384,13 +384,13 @@ "type": "object" }, "KubernetesAppConfig": { - "description": "Settings specific to Kubernetes Apps", + "description": "Settings specific to Kubernetes Apps.", "properties": {}, "title": "KubernetesAppConfig", "type": "object" }, "OutputTopicTypes": { - "description": "Types of output topic\n\nOUTPUT (output topic), ERROR (error topic)", + "description": "Types of output topic.\n\nOUTPUT (output topic), ERROR (error topic)", "enum": [ "output", "error" @@ -399,7 +399,7 @@ "type": "string" }, "ProducerApp": { - "description": "Producer component\nThis producer holds configuration to use as values for the streams bootstrap producer helm chart. Note that the producer does not support error topics.", + "description": "Producer component.\nThis producer holds configuration to use as values for the streams bootstrap producer helm chart. Note that the producer does not support error topics.", "properties": { "app": { "allOf": [ @@ -462,7 +462,7 @@ }, "type": { "default": "producer-app", - "description": "Producer component\nThis producer holds configuration to use as values for the streams bootstrap producer helm chart. Note that the producer does not support error topics.", + "description": "Producer component.\nThis producer holds configuration to use as values for the streams bootstrap producer helm chart. Note that the producer does not support error topics.", "enum": [ "producer-app" ], @@ -485,7 +485,7 @@ "type": "object" }, "ProducerStreamsConfig": { - "description": "Kafka Streams settings specific to Producer", + "description": "Kafka Streams settings specific to Producer.", "properties": { "brokers": { "description": "Brokers", @@ -519,7 +519,7 @@ "type": "object" }, "ProducerValues": { - "description": "Settings specific to producers", + "description": "Settings specific to producers.", "properties": { "nameOverride": { "description": "Override name with this value", @@ -543,7 +543,7 @@ "type": "object" }, "RepoAuthFlags": { - "description": "Authorisation-related flags for `helm repo`", + "description": "Authorisation-related flags for `helm repo`.", "properties": { "ca_file": { "description": "Path to CA bundle file to verify certificates of HTTPS-enabled servers", @@ -578,7 +578,7 @@ "type": "object" }, "StreamsApp": { - "description": "StreamsApp component that configures a streams bootstrap app", + "description": "StreamsApp component that configures a streams bootstrap app.", "properties": { "app": { "allOf": [ @@ -645,7 +645,7 @@ }, "type": { "default": "streams-app", - "description": "StreamsApp component that configures a streams bootstrap app", + "description": "StreamsApp component that configures a streams bootstrap app.", "enum": [ "streams-app" ], @@ -668,7 +668,7 @@ "type": "object" }, "StreamsAppAutoScaling": { - "description": "Kubernetes Event-driven Autoscaling config", + "description": "Kubernetes Event-driven Autoscaling config.", "properties": { "consumerGroup": { "description": "Name of the consumer group used for checking the offset on the topic and processing the related lag.", @@ -771,7 +771,7 @@ "type": "object" }, "StreamsConfig": { - "description": "Streams Bootstrap streams section", + "description": "Streams Bootstrap streams section.", "properties": { "brokers": { "description": "Brokers", @@ -854,7 +854,7 @@ "type": "object" }, "ToSection": { - "description": "Holds multiple output topics", + "description": "Holds multiple output topics.", "properties": { "models": { "additionalProperties": { @@ -880,7 +880,7 @@ }, "TopicConfig": { "additionalProperties": false, - "description": "Configure an output topic", + "description": "Configure an output topic.", "properties": { "configs": { "additionalProperties": { diff --git a/hooks/__init__.py b/hooks/__init__.py index 0ae8ea143..ef17ce38a 100644 --- a/hooks/__init__.py +++ b/hooks/__init__.py @@ -1,4 +1,4 @@ -"""KPOps pre-commit hooks""" +"""KPOps pre-commit hooks.""" from pathlib import Path PATH_ROOT = Path(__file__).parents[1] diff --git a/hooks/gen_docs/__init__.py b/hooks/gen_docs/__init__.py index 5052e8077..5a0d63a28 100644 --- a/hooks/gen_docs/__init__.py +++ b/hooks/gen_docs/__init__.py @@ -5,7 +5,7 @@ class IterableStrEnum(str, Enum): - """Polyfill that also introduces dict-like behavior + """Polyfill that also introduces dict-like behavior. Introduces constructors that return a ``Iterator`` object either containing all items, only their names or their values. diff --git a/hooks/gen_schema.py b/hooks/gen_schema.py index 8fc24f938..7d6b99f2e 100644 --- a/hooks/gen_schema.py +++ b/hooks/gen_schema.py @@ -1,4 +1,4 @@ -"""Generates the stock KPOps editor integration schemas""" +"""Generates the stock KPOps editor integration schemas.""" from contextlib import redirect_stdout from io import StringIO from pathlib import Path @@ -10,7 +10,7 @@ def gen_schema(scope: SchemaScope): - """Generates the specified schema and saves it to a file. + """Generate the specified schema and save it to a file. The file is located in docs/docs/schema and is named ``.json`` diff --git a/kpops/cli/main.py b/kpops/cli/main.py index f58808cd2..c7f0e26a1 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -3,7 +3,7 @@ import logging from enum import Enum from pathlib import Path -from typing import TYPE_CHECKING, Iterator, Optional +from typing import TYPE_CHECKING, Optional import dtyper import typer @@ -23,6 +23,8 @@ from kpops.utils.gen_schema import SchemaScope, gen_config_schema, gen_pipeline_schema if TYPE_CHECKING: + from collections.abc import Iterator + from kpops.components.base_components import PipelineComponent LOG_DIVIDER = "#" * 100 @@ -30,7 +32,7 @@ app = dtyper.Typer(pretty_exceptions_enable=False) BASE_DIR_PATH_OPTION: Path = typer.Option( - default=Path("."), + default=Path(), exists=True, dir_okay=True, file_okay=False, @@ -371,7 +373,7 @@ def clean( def version_callback(show_version: bool) -> None: if show_version: typer.echo(f"KPOps {__version__}") - raise typer.Exit() + raise typer.Exit @app.callback() diff --git a/kpops/cli/registry.py b/kpops/cli/registry.py index 410aa1be5..a97e2cd91 100644 --- a/kpops/cli/registry.py +++ b/kpops/cli/registry.py @@ -2,22 +2,24 @@ import importlib import inspect -import os import sys -from collections.abc import Iterator from dataclasses import dataclass, field -from typing import TypeVar +from pathlib import Path +from typing import TYPE_CHECKING, TypeVar from kpops import __name__ from kpops.cli.exception import ClassNotFoundError from kpops.components.base_components.pipeline_component import PipelineComponent +if TYPE_CHECKING: + from collections.abc import Iterator + KPOPS_MODULE = __name__ + "." T = TypeVar("T") ClassDict = dict[str, type[T]] # type -> class -sys.path.append(os.getcwd()) +sys.path.append(str(Path.cwd())) @dataclass @@ -27,9 +29,9 @@ class Registry: _classes: ClassDict[PipelineComponent] = field(default_factory=dict, init=False) def find_components(self, module_name: str) -> None: - """ - Find all PipelineComponent subclasses in module - :param module_name: name of the python module + """Find all PipelineComponent subclasses in module. + + :param module_name: name of the python module. """ for _class in _find_classes(module_name, PipelineComponent): self._classes[_class.type] = _class @@ -37,17 +39,16 @@ def find_components(self, module_name: str) -> None: def __getitem__(self, component_type: str) -> type[PipelineComponent]: try: return self._classes[component_type] - except KeyError: - raise ClassNotFoundError( - f"Could not find a component of type {component_type}" - ) + except KeyError as ke: + msg = f"Could not find a component of type {component_type}" + raise ClassNotFoundError(msg) from ke def find_class(module_name: str, baseclass: type[T]) -> type[T]: try: return next(_find_classes(module_name, baseclass)) - except StopIteration: - raise ClassNotFoundError + except StopIteration as e: + raise ClassNotFoundError from e def _find_classes(module_name: str, baseclass: type[T]) -> Iterator[type[T]]: diff --git a/kpops/component_handlers/__init__.py b/kpops/component_handlers/__init__.py index 988ca7ee7..fa296a574 100644 --- a/kpops/component_handlers/__init__.py +++ b/kpops/component_handlers/__init__.py @@ -2,11 +2,10 @@ from typing import TYPE_CHECKING -from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( - KafkaConnectHandler, -) - if TYPE_CHECKING: + from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( + KafkaConnectHandler, + ) from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler diff --git a/kpops/component_handlers/helm_wrapper/dry_run_handler.py b/kpops/component_handlers/helm_wrapper/dry_run_handler.py index 8e260f7df..2d28957b7 100644 --- a/kpops/component_handlers/helm_wrapper/dry_run_handler.py +++ b/kpops/component_handlers/helm_wrapper/dry_run_handler.py @@ -11,7 +11,7 @@ def __init__(self, helm: Helm, helm_diff: HelmDiff, namespace: str) -> None: self.namespace = namespace def print_helm_diff(self, stdout: str, helm_release_name: str, log: Logger) -> None: - """Print the diff of the last and current release of this component + """Print the diff of the last and current release of this component. :param stdout: The output of a Helm command that installs or upgrades the release :param helm_release_name: The Helm release name diff --git a/kpops/component_handlers/helm_wrapper/helm.py b/kpops/component_handlers/helm_wrapper/helm.py index 2ad3f5f01..b1b101b41 100644 --- a/kpops/component_handlers/helm_wrapper/helm.py +++ b/kpops/component_handlers/helm_wrapper/helm.py @@ -4,8 +4,7 @@ import re import subprocess import tempfile -from collections.abc import Iterator -from typing import Iterable +from typing import TYPE_CHECKING import yaml @@ -20,6 +19,9 @@ Version, ) +if TYPE_CHECKING: + from collections.abc import Iterable, Iterator + log = logging.getLogger("Helm") @@ -29,16 +31,17 @@ def __init__(self, helm_config: HelmConfig) -> None: self._debug = helm_config.debug self._version = self.get_version() if self._version.major != 3: - raise RuntimeError( - f"The supported Helm version is 3.x.x. The current Helm version is {self._version.major}.{self._version.minor}.{self._version.patch}" - ) + msg = f"The supported Helm version is 3.x.x. The current Helm version is {self._version.major}.{self._version.minor}.{self._version.patch}" + raise RuntimeError(msg) def add_repo( self, repository_name: str, repository_url: str, - repo_auth_flags: RepoAuthFlags = RepoAuthFlags(), + repo_auth_flags: RepoAuthFlags | None = None, ) -> None: + if repo_auth_flags is None: + repo_auth_flags = RepoAuthFlags() command = [ "helm", "repo", @@ -50,7 +53,7 @@ def add_repo( try: self.__execute(command) - except Exception as e: + except (ReleaseNotFoundException, RuntimeError) as e: if ( len(e.args) == 1 and re.match( @@ -59,9 +62,9 @@ def add_repo( ) is not None ): - log.error(f"Could not add repository {repository_name}. {e}") + log.exception(f"Could not add repository {repository_name}.") else: - raise e + raise if self._version.minor > 7: self.__execute(["helm", "repo", "update", repository_name]) @@ -75,9 +78,11 @@ def upgrade_install( dry_run: bool, namespace: str, values: dict, - flags: HelmUpgradeInstallFlags = HelmUpgradeInstallFlags(), + flags: HelmUpgradeInstallFlags | None = None, ) -> str: - """Prepares and executes the `helm upgrade --install` command""" + """Prepare and execute the `helm upgrade --install` command.""" + if flags is None: + flags = HelmUpgradeInstallFlags() with tempfile.NamedTemporaryFile("w") as values_file: yaml.safe_dump(values, values_file) @@ -103,7 +108,7 @@ def uninstall( release_name: str, dry_run: bool, ) -> str | None: - """Prepares and executes the helm uninstall command""" + """Prepare and execute the helm uninstall command.""" command = [ "helm", "uninstall", @@ -126,7 +131,7 @@ def template( chart: str, namespace: str, values: dict, - flags: HelmTemplateFlags = HelmTemplateFlags(), + flags: HelmTemplateFlags | None = None, ) -> str: """From HELM: Render chart templates locally and display the output. @@ -141,6 +146,8 @@ def template( :param flags: the flags to be set for `helm template`, defaults to HelmTemplateFlags() :return: the output of `helm template` """ + if flags is None: + flags = HelmTemplateFlags() with tempfile.NamedTemporaryFile("w") as values_file: yaml.safe_dump(values, values_file) command = [ @@ -177,9 +184,8 @@ def get_version(self) -> Version: short_version = self.__execute(command) version_match = re.search(r"^v(\d+(?:\.\d+){0,2})", short_version) if version_match is None: - raise RuntimeError( - f"Could not parse the Helm version.\n\nHelm output:\n{short_version}" - ) + msg = f"Could not parse the Helm version.\n\nHelm output:\n{short_version}" + raise RuntimeError(msg) version = map(int, version_match.group(1).split(".")) return Version(*version) @@ -206,8 +212,8 @@ def __execute(self, command: list[str]) -> str: log.debug(f"Executing {' '.join(command)}") process = subprocess.run( command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + check=True, + capture_output=True, text=True, ) Helm.parse_helm_command_stderr_output(process.stderr) @@ -228,7 +234,7 @@ def parse_helm_command_stderr_output(stderr: str) -> None: for line in stderr.splitlines(): lower = line.lower() if "release: not found" in lower: - raise ReleaseNotFoundException() + raise ReleaseNotFoundException elif "error" in lower: raise RuntimeError(stderr) elif "warning" in lower: diff --git a/kpops/component_handlers/helm_wrapper/helm_diff.py b/kpops/component_handlers/helm_wrapper/helm_diff.py index e778a7df2..26de5613a 100644 --- a/kpops/component_handlers/helm_wrapper/helm_diff.py +++ b/kpops/component_handlers/helm_wrapper/helm_diff.py @@ -1,6 +1,5 @@ import logging -from collections.abc import Iterator -from typing import Iterable +from collections.abc import Iterable, Iterator from kpops.component_handlers.helm_wrapper.model import HelmDiffConfig, HelmTemplate from kpops.utils.dict_differ import Change, render_diff @@ -17,7 +16,7 @@ def calculate_changes( current_release: Iterable[HelmTemplate], new_release: Iterable[HelmTemplate], ) -> Iterator[Change[dict]]: - """Compare 2 releases and generate a Change object for each difference + """Compare 2 releases and generate a Change object for each difference. :param current_release: Iterable containing HelmTemplate objects for the current release :param new_release: Iterable containing HelmTemplate objects for the new release diff --git a/kpops/component_handlers/helm_wrapper/model.py b/kpops/component_handlers/helm_wrapper/model.py index a8aaf8906..af21abb3f 100644 --- a/kpops/component_handlers/helm_wrapper/model.py +++ b/kpops/component_handlers/helm_wrapper/model.py @@ -1,6 +1,6 @@ +from collections.abc import Iterator from dataclasses import dataclass from pathlib import Path -from typing import Iterator import yaml from pydantic import BaseConfig, BaseModel, Extra, Field @@ -20,7 +20,7 @@ class HelmDiffConfig(BaseModel): class RepoAuthFlags(BaseModel): - """Authorisation-related flags for `helm repo` + """Authorisation-related flags for `helm repo`. :param username: Username, defaults to None :param password: Password, defaults to None @@ -65,7 +65,7 @@ def to_command(self) -> list[str]: class HelmRepoConfig(BaseModel): - """Helm repository configuration + """Helm repository configuration. :param repository_name: Name of the Helm repository :param url: URL to the Helm repository @@ -85,7 +85,7 @@ class Config(DescConfig): class HelmConfig(BaseModel): - """Global Helm configuration + """Global Helm configuration. :param context: Name of kubeconfig context (`--kube-context`) :param debug: Run Helm in Debug mode @@ -180,7 +180,8 @@ def parse_source(source: str) -> str: # Source: chart/templates/serviceaccount.yaml """ if not source.startswith(HELM_SOURCE_PREFIX): - raise ParseError("Not a valid Helm template source") + msg = "Not a valid Helm template source" + raise ParseError(msg) return source.removeprefix(HELM_SOURCE_PREFIX).strip() @classmethod @@ -205,9 +206,9 @@ def __iter__(self) -> Iterator[str]: @property def manifest(self) -> str: - """ - Reads the manifest section of Helm stdout. `helm upgrade --install` output message contains three sections - in the following order: + """Reads the manifest section of Helm stdout. + + `helm upgrade --install` output message contains three sections in the following order: - HOOKS - MANIFEST diff --git a/kpops/component_handlers/helm_wrapper/utils.py b/kpops/component_handlers/helm_wrapper/utils.py index d39536041..7ad76b93a 100644 --- a/kpops/component_handlers/helm_wrapper/utils.py +++ b/kpops/component_handlers/helm_wrapper/utils.py @@ -7,11 +7,11 @@ def trim_release_name(name: str, suffix: str = "") -> str: - """ - Trim Helm release name while preserving suffix. + """Trim Helm release name while preserving suffix. + :param name: The release name including optional suffix :param suffix: The release suffix to preserve - :return: Truncated release name + :return: Truncated release name. """ if len(name) > RELEASE_NAME_MAX_LEN: new_name = name[: (RELEASE_NAME_MAX_LEN - len(suffix))] + suffix diff --git a/kpops/component_handlers/kafka_connect/connect_wrapper.py b/kpops/component_handlers/kafka_connect/connect_wrapper.py index 9a3dd307e..13f02a80d 100644 --- a/kpops/component_handlers/kafka_connect/connect_wrapper.py +++ b/kpops/component_handlers/kafka_connect/connect_wrapper.py @@ -20,9 +20,7 @@ class ConnectWrapper: - """ - Wraps Kafka Connect APIs - """ + """Wraps Kafka Connect APIs.""" def __init__(self, host: str | None): if not host: @@ -40,11 +38,11 @@ def host(self) -> str: def create_connector( self, connector_config: KafkaConnectorConfig ) -> KafkaConnectResponse: - """ - Creates a new connector + """Create a new connector. + API Reference: https://docs.confluent.io/platform/current/connect/references/restapi.html#post--connectors :param connector_config: The config of the connector - :return: The current connector info if successful + :return: The current connector info if successful. """ config_json = connector_config.dict() connect_data = {"name": connector_config.name, "config": config_json} @@ -64,11 +62,13 @@ def create_connector( raise KafkaConnectError(response) def get_connector(self, connector_name: str) -> KafkaConnectResponse: - """ - Get information about the connector. - API Reference: https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name) + """Get information about the connector. + + API Reference: + https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name) + :param connector_name: Nameof the crated connector - :return: Information about the connector + :return: Information about the connector. """ response = httpx.get( url=f"{self._host}/connectors/{connector_name}", headers=HEADERS @@ -79,7 +79,7 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse: return KafkaConnectResponse(**response.json()) elif response.status_code == httpx.codes.NOT_FOUND: log.info(f"The named connector {connector_name} does not exists.") - raise ConnectorNotFoundException() + raise ConnectorNotFoundException elif response.status_code == httpx.codes.CONFLICT: log.warning( "Rebalancing in progress while getting a connector... Retrying..." @@ -91,8 +91,11 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse: def update_connector_config( self, connector_config: KafkaConnectorConfig ) -> KafkaConnectResponse: - """ - Create a new connector using the given configuration, or update the configuration for an existing connector. + """Create or update a connector. + + Create a new connector using the given configuration,or update the + configuration for an existing connector. + :param connector_config: Configuration parameters for the connector. :return: Information about the connector after the change has been made. """ @@ -123,10 +126,11 @@ def update_connector_config( def validate_connector_config( self, connector_config: KafkaConnectorConfig ) -> list[str]: - """ - Validate connector config using the given configuration + """Validate connector config using the given configuration. + :param connector_config: Configuration parameters for the connector. - :return: + :raises KafkaConnectError: Kafka Konnect error + :return: List of all found errors """ response = httpx.put( url=f"{self._host}/connector-plugins/{connector_config.class_name}/config/validate", @@ -139,7 +143,7 @@ def validate_connector_config( **response.json() ) - errors = [] + errors: list[str] = [] if kafka_connect_error_response.error_count > 0: for config in kafka_connect_error_response.configs: if len(config.value.errors) > 0: @@ -151,9 +155,12 @@ def validate_connector_config( raise KafkaConnectError(response) def delete_connector(self, connector_name: str) -> None: - """ - Deletes a connector, halting all tasks and deleting its configuration. - API Reference:https://docs.confluent.io/platform/current/connect/references/restapi.html#delete--connectors-(string-name)- + """Delete a connector, halting all tasks and deleting its configuration. + + API Reference: + https://docs.confluent.io/platform/current/connect/references/restapi.html#delete--connectors-(string-name)-. + :param connector_name: Configuration parameters for the connector. + :raises ConnectorNotFoundException: Connector not found """ response = httpx.delete( url=f"{self._host}/connectors/{connector_name}", headers=HEADERS @@ -163,7 +170,7 @@ def delete_connector(self, connector_name: str) -> None: return elif response.status_code == httpx.codes.NOT_FOUND: log.info(f"The named connector {connector_name} does not exists.") - raise ConnectorNotFoundException() + raise ConnectorNotFoundException elif response.status_code == httpx.codes.CONFLICT: log.warning( "Rebalancing in progress while deleting a connector... Retrying..." diff --git a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py index 14f5af076..c810a9c36 100644 --- a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py +++ b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py @@ -8,18 +8,18 @@ ConnectorNotFoundException, ConnectorStateException, ) -from kpops.component_handlers.kafka_connect.model import KafkaConnectorConfig from kpops.component_handlers.kafka_connect.timeout import timeout from kpops.utils.colorify import magentaify from kpops.utils.dict_differ import render_diff -try: - from typing import Self -except ImportError: - from typing_extensions import Self - if TYPE_CHECKING: + try: + from typing import Self + except ImportError: + from typing_extensions import Self + from kpops.cli.pipeline_config import PipelineConfig + from kpops.component_handlers.kafka_connect.model import KafkaConnectorConfig log = logging.getLogger("KafkaConnectHandler") @@ -36,8 +36,10 @@ def __init__( def create_connector( self, connector_config: KafkaConnectorConfig, *, dry_run: bool ) -> None: - """ - Creates a connector. If the connector exists the config of that connector gets updated. + """Create a connector. + + If the connector exists the config of that connector gets updated. + :param connector_config: The connector config. :param dry_run: If the connector creation should be run in dry run mode. """ @@ -64,8 +66,8 @@ def create_connector( ) def destroy_connector(self, connector_name: str, *, dry_run: bool) -> None: - """ - Deletes a connector resource from the cluster. + """Delete a connector resource from the cluster. + :param connector_name: The connector name. :param dry_run: If the connector deletion should be run in dry run mode. """ @@ -112,9 +114,8 @@ def __dry_run_connector_creation( errors = self._connect_wrapper.validate_connector_config(connector_config) if len(errors) > 0: formatted_errors = "\n".join(errors) - raise ConnectorStateException( - f"Connector Creation: validating the connector config for connector {connector_name} resulted in the following errors: {formatted_errors}" - ) + msg = f"Connector Creation: validating the connector config for connector {connector_name} resulted in the following errors: {formatted_errors}" + raise ConnectorStateException(msg) else: log.info( f"Connector Creation: connector config for {connector_name} is valid!" diff --git a/kpops/component_handlers/kafka_connect/model.py b/kpops/component_handlers/kafka_connect/model.py index 9feed448f..e83e33e5d 100644 --- a/kpops/component_handlers/kafka_connect/model.py +++ b/kpops/component_handlers/kafka_connect/model.py @@ -13,7 +13,7 @@ class KafkaConnectorType(str, Enum): class KafkaConnectorConfig(BaseModel): - """Settings specific to Kafka Connectors""" + """Settings specific to Kafka Connectors.""" connector_class: str name: str = Field(default=..., hidden_from_schema=True) @@ -31,7 +31,8 @@ def schema_extra(cls, schema: dict[str, Any], model: type[BaseModel]) -> None: @validator("connector_class") def connector_class_must_contain_dot(cls, connector_class: str) -> str: if "." not in connector_class: - raise ValueError(f"Invalid connector class {connector_class}") + msg = f"Invalid connector class {connector_class}" + raise ValueError(msg) return connector_class @property diff --git a/kpops/component_handlers/kafka_connect/timeout.py b/kpops/component_handlers/kafka_connect/timeout.py index d93d608b7..e75ac7361 100644 --- a/kpops/component_handlers/kafka_connect/timeout.py +++ b/kpops/component_handlers/kafka_connect/timeout.py @@ -1,7 +1,8 @@ import asyncio import logging from asyncio import TimeoutError -from typing import Callable, TypeVar +from collections.abc import Callable +from typing import TypeVar log = logging.getLogger("Timeout") @@ -9,10 +10,10 @@ def timeout(func: Callable[..., T], *, secs: int = 0) -> T | None: - """ - Sets a timeout for a given lambda function + """Set a timeout for a given lambda function. + :param func: The callable function - :param secs: The timeout in seconds + :param secs: The timeout in seconds. """ async def main_supervisor(func: Callable[..., T], secs: int) -> T: @@ -25,9 +26,8 @@ async def main_supervisor(func: Callable[..., T], secs: int) -> T: loop = asyncio.get_event_loop() try: - complete = loop.run_until_complete(main_supervisor(func, secs)) - return complete + return loop.run_until_complete(main_supervisor(func, secs)) except TimeoutError: - log.error( + log.exception( f"Kafka Connect operation {func.__name__} timed out after {secs} seconds. To increase the duration, set the `timeout` option in config.yaml." ) diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index a053ccc62..63d88b726 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -3,20 +3,23 @@ import json import logging from functools import cached_property +from typing import TYPE_CHECKING from schema_registry.client import SchemaRegistryClient from schema_registry.client.schema import AvroSchema from kpops.cli.exception import ClassNotFoundError -from kpops.cli.pipeline_config import PipelineConfig from kpops.cli.registry import find_class from kpops.component_handlers.schema_handler.schema_provider import ( Schema, SchemaProvider, ) -from kpops.components.base_components.models.to_section import ToSection from kpops.utils.colorify import greenify, magentaify +if TYPE_CHECKING: + from kpops.cli.pipeline_config import PipelineConfig + from kpops.components.base_components.models.to_section import ToSection + log = logging.getLogger("SchemaHandler") @@ -29,16 +32,13 @@ def __init__(self, url: str, components_module: str | None): def schema_provider(self) -> SchemaProvider: try: if not self.components_module: - raise ValueError( - f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists." - ) + msg = f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists." + raise ValueError(msg) schema_provider_class = find_class(self.components_module, SchemaProvider) return schema_provider_class() # pyright: ignore[reportGeneralTypeIssues] - except ClassNotFoundError: - raise ValueError( - f"No schema provider found in components module {self.components_module}. " - f"Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}." - ) + except ClassNotFoundError as e: + msg = f"No schema provider found in components module {self.components_module}. Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}." + raise ValueError(msg) from e @classmethod def load_schema_handler( @@ -144,9 +144,8 @@ def __check_compatibility( if isinstance(schema, AvroSchema) else str(schema) ) - raise Exception( - f"Schema is not compatible for {subject} and model {schema_class}. \n {json.dumps(schema_str, indent=4)}" - ) + msg = f"Schema is not compatible for {subject} and model {schema_class}. \n {json.dumps(schema_str, indent=4)}" + raise Exception(msg) else: log.debug( f"Schema Submission: schema was already submitted for the subject {subject} as version {registered_version.schema}. Therefore, the specified schema must be compatible." diff --git a/kpops/component_handlers/schema_handler/schema_provider.py b/kpops/component_handlers/schema_handler/schema_provider.py index 2b93bf943..0c0423a40 100644 --- a/kpops/component_handlers/schema_handler/schema_provider.py +++ b/kpops/component_handlers/schema_handler/schema_provider.py @@ -1,11 +1,12 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import TypeAlias +from typing import TYPE_CHECKING, TypeAlias from schema_registry.client.schema import AvroSchema, JsonSchema -from kpops.components.base_components.models import ModelName, ModelVersion +if TYPE_CHECKING: + from kpops.components.base_components.models import ModelName, ModelVersion Schema: TypeAlias = AvroSchema | JsonSchema diff --git a/kpops/component_handlers/topic/handler.py b/kpops/component_handlers/topic/handler.py index 1df0d106a..dae606108 100644 --- a/kpops/component_handlers/topic/handler.py +++ b/kpops/component_handlers/topic/handler.py @@ -65,7 +65,7 @@ def create_topics(self, to_section: ToSection, dry_run: bool) -> None: self.proxy_wrapper.create_topic(topic_spec=topic_spec) def delete_topics(self, to_section: ToSection, dry_run: bool) -> None: - for topic_name in to_section.topics.keys(): + for topic_name in to_section.topics: if dry_run: self.__dry_run_topic_deletion(topic_name=topic_name) else: @@ -148,9 +148,8 @@ def __check_partition_count( f"Topic Creation: partition count of topic {topic_name} did not change. Current partitions count {partition_count}. Updating configs." ) else: - raise TopicTransactionError( - f"Topic Creation: partition count of topic {topic_name} changed! Partitions count of topic {topic_name} is {partition_count}. The given partitions count {topic_spec.partitions_count}." - ) + msg = f"Topic Creation: partition count of topic {topic_name} changed! Partitions count of topic {topic_name} is {partition_count}. The given partitions count {topic_spec.partitions_count}." + raise TopicTransactionError(msg) @staticmethod def __check_replication_factor( @@ -168,9 +167,8 @@ def __check_replication_factor( f"Topic Creation: replication factor of topic {topic_name} did not change. Current replication factor {replication_factor}. Updating configs." ) else: - raise TopicTransactionError( - f"Topic Creation: replication factor of topic {topic_name} changed! Replication factor of topic {topic_name} is {replication_factor}. The given replication count {topic_spec.replication_factor}." - ) + msg = f"Topic Creation: replication factor of topic {topic_name} changed! Replication factor of topic {topic_name} is {replication_factor}. The given replication count {topic_spec.replication_factor}." + raise TopicTransactionError(msg) def __dry_run_topic_deletion(self, topic_name: str) -> None: try: @@ -199,11 +197,11 @@ def __dry_run_topic_deletion(self, topic_name: str) -> None: @classmethod def __prepare_body(cls, topic_name: str, topic_config: TopicConfig) -> TopicSpec: - """ - Prepares the POST request body needed for the topic creation + """Prepare the POST request body needed for the topic creation. + :param topic_name: The name of the topic :param topic_config: The topic config - :return: + :return: Topic specification """ topic_spec_json: dict = topic_config.dict( include={ diff --git a/kpops/component_handlers/topic/proxy_wrapper.py b/kpops/component_handlers/topic/proxy_wrapper.py index af7914379..4edc3633c 100644 --- a/kpops/component_handlers/topic/proxy_wrapper.py +++ b/kpops/component_handlers/topic/proxy_wrapper.py @@ -21,27 +21,26 @@ class ProxyWrapper: - """ - Wraps Kafka REST Proxy APIs - """ + """Wraps Kafka REST Proxy APIs.""" def __init__(self, pipeline_config: PipelineConfig) -> None: if not pipeline_config.kafka_rest_host: - raise ValueError( - "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." - ) + msg = "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." + raise ValueError(msg) self._host = pipeline_config.kafka_rest_host @cached_property def cluster_id(self) -> str: - """ - Gets the Kafka cluster ID by sending a requests to Kafka REST proxy. + """Get the Kafka cluster ID by sending a request to Kafka REST proxy. + More information about the cluster ID can be found here: - https://docs.confluent.io/platform/current/kafka-rest/api.html#cluster-v3 + https://docs.confluent.io/platform/current/kafka-rest/api.html#cluster-v3. Currently both Kafka and Kafka REST Proxy are only aware of the Kafka cluster pointed at by the bootstrap.servers configuration. Therefore, only one Kafka cluster will be returned. + + :raises KafkaRestProxyError: Kafka REST proxy error :return: The Kafka cluster ID. """ response = httpx.get(url=f"{self._host}/v3/clusters") @@ -56,10 +55,13 @@ def host(self) -> str: return self._host def create_topic(self, topic_spec: TopicSpec) -> None: - """ - Creates a topic. - API Reference: https://docs.confluent.io/platform/current/kafka-rest/api.html#post--clusters-cluster_id-topics + """Create a topic. + + API Reference: + https://docs.confluent.io/platform/current/kafka-rest/api.html#post--clusters-cluster_id-topics + :param topic_spec: The topic specification. + :raises KafkaRestProxyError: Kafka REST proxy error """ response = httpx.post( url=f"{self._host}/v3/clusters/{self.cluster_id}/topics", @@ -74,10 +76,13 @@ def create_topic(self, topic_spec: TopicSpec) -> None: raise KafkaRestProxyError(response) def delete_topic(self, topic_name: str) -> None: - """ - Deletes a topic - API Reference: https://docs.confluent.io/platform/current/kafka-rest/api.html#delete--clusters-cluster_id-topics-topic_name - :param topic_name: Name of the topic + """Delete a topic. + + API Reference: + https://docs.confluent.io/platform/current/kafka-rest/api.html#delete--clusters-cluster_id-topics-topic_name + + :param topic_name: Name of the topic. + :raises KafkaRestProxyError: Kafka REST proxy error """ response = httpx.delete( url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}", @@ -90,11 +95,15 @@ def delete_topic(self, topic_name: str) -> None: raise KafkaRestProxyError(response) def get_topic(self, topic_name: str) -> TopicResponse: - """ - Returns the topic with the given topic_name. - API Reference: https://docs.confluent.io/platform/current/kafka-rest/api.html#get--clusters-cluster_id-topics-topic_name + """Return the topic with the given topic_name. + + API Reference: + https://docs.confluent.io/platform/current/kafka-rest/api.html#get--clusters-cluster_id-topics-topic_name + :param topic_name: The topic name. - :return: Response of the get topic API + :raises TopicNotFoundException: Topic not found + :raises KafkaRestProxyError: Kafka REST proxy error + :return: Response of the get topic API. """ response = httpx.get( url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}", @@ -111,15 +120,19 @@ def get_topic(self, topic_name: str) -> TopicResponse: ): log.debug(f"Topic {topic_name} not found.") log.debug(response.json()) - raise TopicNotFoundException() + raise TopicNotFoundException raise KafkaRestProxyError(response) def get_topic_config(self, topic_name: str) -> TopicConfigResponse: - """ - Return the config with the given topic_name. - API Reference: https://docs.confluent.io/platform/current/kafka-rest/api.html#acl-v3 + """Return the config with the given topic_name. + + API Reference: + https://docs.confluent.io/platform/current/kafka-rest/api.html#acl-v3 + :param topic_name: The topic name. + :raises TopicNotFoundException: Topic not found + :raises KafkaRestProxyError: Kafka REST proxy error :return: The topic configuration. """ response = httpx.get( @@ -138,16 +151,19 @@ def get_topic_config(self, topic_name: str) -> TopicConfigResponse: ): log.debug(f"Configs for {topic_name} not found.") log.debug(response.json()) - raise TopicNotFoundException() + raise TopicNotFoundException raise KafkaRestProxyError(response) def batch_alter_topic_config(self, topic_name: str, json_body: list[dict]) -> None: - """ - Reset config of given config_name param to the default value on the kafka server. - API Reference: https://docs.confluent.io/platform/current/kafka-rest/api.html#post--clusters-cluster_id-topics-topic_name-configs-alter + """Reset config of given config_name param to the default value on the kafka server. + + API Reference: + https://docs.confluent.io/platform/current/kafka-rest/api.html#post--clusters-cluster_id-topics-topic_name-configs-alter + :param topic_name: The topic name. :param config_name: The configuration parameter name. + :raises KafkaRestProxyError: Kafka REST proxy error """ response = httpx.post( url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs:alter", @@ -161,9 +177,12 @@ def batch_alter_topic_config(self, topic_name: str, json_body: list[dict]) -> No raise KafkaRestProxyError(response) def get_broker_config(self) -> BrokerConfigResponse: - """ - Return the list of configuration parameters for all the brokers in the given Kafka cluster. - API Reference: https://docs.confluent.io/platform/current/kafka-rest/api.html#get--clusters-cluster_id-brokers---configs + """Return the list of configuration parameters for all the brokers in the given Kafka cluster. + + API Reference: + https://docs.confluent.io/platform/current/kafka-rest/api.html#get--clusters-cluster_id-brokers---configs + + :raises KafkaRestProxyError: Kafka REST proxy error :return: The broker configuration. """ response = httpx.get( diff --git a/kpops/component_handlers/utils/exception.py b/kpops/component_handlers/utils/exception.py index fe906190f..5de7f7717 100644 --- a/kpops/component_handlers/utils/exception.py +++ b/kpops/component_handlers/utils/exception.py @@ -10,11 +10,11 @@ def __init__(self, response: httpx.Response) -> None: self.error_code = response.status_code self.error_msg = "Something went wrong!" try: - log.error( - f"The request responded with the code {self.error_code}. Error body: {response.json()}" + log.exception( + f"The request responded with the code {self.error_code}. Error body: {response.json()}", ) response.raise_for_status() except httpx.HTTPError as e: self.error_msg = str(e) - log.error(f"More information: {self.error_msg}") + log.exception(f"More information: {self.error_msg}") super().__init__() diff --git a/kpops/components/base_components/base_defaults_component.py b/kpops/components/base_components/base_defaults_component.py index 99dec42f2..d9100bd25 100644 --- a/kpops/components/base_components/base_defaults_component.py +++ b/kpops/components/base_components/base_defaults_component.py @@ -77,15 +77,15 @@ def __init__(self, **kwargs) -> None: self._validate_custom(**kwargs) @cached_classproperty - def type(cls: type[Self]) -> str: # pyright: ignore - """Return calling component's type + def type(cls: type[Self]) -> str: # pyright: ignore[reportGeneralTypeIssues] + """Return calling component's type. :returns: Component class name in dash-case """ return to_dash(cls.__name__) def extend_with_defaults(self, **kwargs) -> dict: - """Merge parent components' defaults with own + """Merge parent components' defaults with own. :param kwargs: The init kwargs for pydantic :returns: Enriched kwargs with inheritted defaults @@ -105,15 +105,13 @@ def extend_with_defaults(self, **kwargs) -> dict: defaults = load_defaults( self.__class__, main_default_file_path, environment_default_file_path ) - kwargs = update_nested(kwargs, defaults) - return kwargs + return update_nested(kwargs, defaults) def _validate_custom(self, **kwargs) -> None: """Run custom validation on component. :param kwargs: The init kwargs for the component """ - pass def load_defaults( @@ -121,7 +119,7 @@ def load_defaults( defaults_file_path: Path, environment_defaults_file_path: Path | None = None, ) -> dict: - """Resolve component-specific defaults including environment defaults + """Resolve component-specific defaults including environment defaults. :param component_class: Component class :param defaults_file_path: Path to `defaults.yaml` @@ -153,7 +151,7 @@ def load_defaults( def defaults_from_yaml(path: Path, key: str) -> dict: - """Read component-specific settings from a defaults yaml file and return @default if not found + """Read component-specific settings from a defaults yaml file and return @default if not found. :param path: Path to defaults yaml file :param key: Component type @@ -165,9 +163,10 @@ def defaults_from_yaml(path: Path, key: str) -> dict: """ content = load_yaml_file(path, substitution=ENV) if not isinstance(content, dict): - raise TypeError( + msg = ( "Default files should be structured as map ([app type] -> [default config]" ) + raise TypeError(msg) value = content.get(key) if value is None: return {} @@ -178,7 +177,7 @@ def defaults_from_yaml(path: Path, key: str) -> dict: def get_defaults_file_paths(config: PipelineConfig) -> tuple[Path, Path]: - """Return the paths to the main and the environment defaults-files + """Return the paths to the main and the environment defaults-files. The files need not exist, this function will only check if the dir set in `config.defaults_path` exists and return paths to the defaults files diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index c522e040e..a13dc7a7d 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -22,7 +22,7 @@ class KafkaStreamsConfig(BaseModel): - """Kafka Streams config + """Kafka Streams config. :param brokers: Brokers :param schema_registry_url: URL of the schema registry, defaults to None @@ -38,7 +38,7 @@ class Config(CamelCaseConfig, DescConfig): class KafkaAppConfig(KubernetesAppConfig): - """Settings specific to Kafka Apps + """Settings specific to Kafka Apps. :param streams: Kafka streams config :param name_override: Override name with this value, defaults to None @@ -82,8 +82,8 @@ class KafkaApp(KubernetesApp, ABC): @property def clean_up_helm_chart(self) -> str: - """Helm chart used to destroy and clean this component""" - raise NotImplementedError() + """Helm chart used to destroy and clean this component.""" + raise NotImplementedError @override def deploy(self, dry_run: bool) -> None: @@ -104,7 +104,7 @@ def _run_clean_up_job( dry_run: bool, retain_clean_jobs: bool = False, ) -> None: - """Clean an app using the respective cleanup job + """Clean an app using the respective cleanup job. :param values: The value YAML for the chart :param dry_run: Dry run command @@ -133,7 +133,7 @@ def _run_clean_up_job( self.__uninstall_clean_up_job(clean_up_release_name, dry_run) def __uninstall_clean_up_job(self, release_name: str, dry_run: bool) -> None: - """Uninstall clean up job + """Uninstall clean up job. :param release_name: Name of the Helm release :param dry_run: Whether to do a dry run of the command @@ -147,7 +147,7 @@ def __install_clean_up_job( values: dict, dry_run: bool, ) -> str: - """Install clean up job + """Install clean up job. :param release_name: Name of the Helm release :param suffix: Suffix to add to the release name, e.g. "-clean" diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index e53886d68..96ee68041 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -3,7 +3,7 @@ import logging from abc import ABC from functools import cached_property -from typing import Any, NoReturn +from typing import TYPE_CHECKING, Any, NoReturn from pydantic import Field, validator from typing_extensions import override @@ -25,16 +25,18 @@ KafkaConnectResetterValues, ) from kpops.components.base_components.base_defaults_component import deduplicate -from kpops.components.base_components.models.from_section import FromTopic from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr +if TYPE_CHECKING: + from kpops.components.base_components.models.from_section import FromTopic + log = logging.getLogger("KafkaConnector") class KafkaConnector(PipelineComponent, ABC): - """Base class for all Kafka connectors + """Base class for all Kafka connectors. Should only be used to set defaults @@ -85,13 +87,14 @@ def connector_config_should_have_component_name( component_name = values["prefix"] + values["name"] connector_name: str | None = app.get("name") if connector_name is not None and connector_name != component_name: - raise ValueError("Connector name should be the same as component name") + msg = "Connector name should be the same as component name" + raise ValueError(msg) app["name"] = component_name return app @cached_property def helm(self) -> Helm: - """Helm object that contains component-specific config such as repo""" + """Helm object that contains component-specific config such as repo.""" helm_repo_config = self.repo_config helm = Helm(self.config.helm_config) helm.add_repo( @@ -105,8 +108,7 @@ def helm(self) -> Helm: def _resetter_release_name(self) -> str: suffix = "-clean" clean_up_release_name = self.full_name + suffix - trimmed_name = trim_release_name(clean_up_release_name, suffix) - return trimmed_name + return trim_release_name(clean_up_release_name, suffix) @property def _resetter_helm_chart(self) -> str: @@ -119,7 +121,7 @@ def dry_run_handler(self) -> DryRunHandler: @property def helm_flags(self) -> HelmFlags: - """Return shared flags for Helm commands""" + """Return shared flags for Helm commands.""" return HelmFlags( **self.repo_config.repo_auth_flags.dict(), version=self.version, @@ -128,7 +130,7 @@ def helm_flags(self) -> HelmFlags: @property def template_flags(self) -> HelmTemplateFlags: - """Return flags for Helm template command""" + """Return flags for Helm template command.""" return HelmTemplateFlags( **self.helm_flags.dict(), api_version=self.config.helm_config.api_version, @@ -169,7 +171,7 @@ def _run_connect_resetter( retain_clean_jobs: bool, **kwargs, ) -> None: - """Clean the connector from the cluster + """Clean the connector from the cluster. At first, it deletes the previous cleanup job (connector resetter) to make sure that there is no running clean job in the cluster. Then it releases a cleanup job. @@ -208,7 +210,7 @@ def __install_connect_resetter( dry_run: bool, **kwargs, ) -> str: - """Install connector resetter + """Install connector resetter. :param dry_run: Whether to dry run the command :return: The output of `helm upgrade --install` @@ -233,7 +235,7 @@ def _get_kafka_connect_resetter_values( self, **kwargs, ) -> dict: - """Get connector resetter helm chart values + """Get connector resetter helm chart values. :return: The Helm chart values of the connector resetter """ @@ -251,7 +253,7 @@ def _get_kafka_connect_resetter_values( } def __uninstall_connect_resetter(self, release_name: str, dry_run: bool) -> None: - """Uninstall connector resetter + """Uninstall connector resetter. :param release_name: Name of the release to be uninstalled :param dry_run: Whether to do a dry run of the command @@ -264,7 +266,7 @@ def __uninstall_connect_resetter(self, release_name: str, dry_run: bool) -> None class KafkaSourceConnector(KafkaConnector): - """Kafka source connector model + """Kafka source connector model. :param offset_topic: offset.storage.topic, more info: https://kafka.apache.org/documentation/#connect_running, @@ -280,7 +282,8 @@ class KafkaSourceConnector(KafkaConnector): @override def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: - raise NotImplementedError("Kafka source connector doesn't support FromSection") + msg = "Kafka source connector doesn't support FromSection" + raise NotImplementedError(msg) @override def template(self) -> None: @@ -306,7 +309,7 @@ def clean(self, dry_run: bool) -> None: self.__run_kafka_connect_resetter(dry_run) def __run_kafka_connect_resetter(self, dry_run: bool) -> None: - """Runs the connector resetter + """Run the connector resetter. :param dry_run: Whether to do a dry run of the command """ @@ -318,7 +321,7 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None: class KafkaSinkConnector(KafkaConnector): - """Kafka sink connector model""" + """Kafka sink connector model.""" _connector_type = KafkaConnectorType.SINK @@ -361,7 +364,7 @@ def clean(self, dry_run: bool) -> None: def __run_kafka_connect_resetter( self, dry_run: bool, delete_consumer_group: bool ) -> None: - """Runs the connector resetter + """Run the connector resetter. :param dry_run: Whether to do a dry run of the command :param delete_consumer_group: Whether the consumer group should be deleted or not diff --git a/kpops/components/base_components/kubernetes_app.py b/kpops/components/base_components/kubernetes_app.py index 88ef8380d..ff35459c3 100644 --- a/kpops/components/base_components/kubernetes_app.py +++ b/kpops/components/base_components/kubernetes_app.py @@ -30,7 +30,7 @@ class KubernetesAppConfig(BaseModel): - """Settings specific to Kubernetes Apps""" + """Settings specific to Kubernetes Apps.""" class Config(CamelCaseConfig, DescConfig): extra = Extra.allow @@ -68,7 +68,7 @@ class KubernetesApp(PipelineComponent): @cached_property def helm(self) -> Helm: - """Helm object that contains component-specific config such as repo""" + """Helm object that contains component-specific config such as repo.""" helm = Helm(self.config.helm_config) if self.repo_config is not None: helm.add_repo( @@ -80,7 +80,7 @@ def helm(self) -> Helm: @cached_property def helm_diff(self) -> HelmDiff: - """Helm diff object of last and current release of this component""" + """Helm diff object of last and current release of this component.""" return HelmDiff(self.config.helm_diff_config) @cached_property @@ -95,14 +95,15 @@ def helm_release_name(self) -> str: @property def helm_chart(self) -> str: - """Return component's Helm chart""" - raise NotImplementedError( + """Return component's Helm chart.""" + msg = ( f"Please implement the helm_chart property of the {self.__module__} module." ) + raise NotImplementedError(msg) @property def helm_flags(self) -> HelmFlags: - """Return shared flags for Helm commands""" + """Return shared flags for Helm commands.""" auth_flags = self.repo_config.repo_auth_flags.dict() if self.repo_config else {} return HelmFlags( **auth_flags, @@ -112,7 +113,7 @@ def helm_flags(self) -> HelmFlags: @property def template_flags(self) -> HelmTemplateFlags: - """Return flags for Helm template command""" + """Return flags for Helm template command.""" return HelmTemplateFlags( **self.helm_flags.dict(), api_version=self.config.helm_config.api_version, @@ -131,7 +132,7 @@ def template(self) -> None: @property def deploy_flags(self) -> HelmUpgradeInstallFlags: - """Return flags for Helm upgrade install command""" + """Return flags for Helm upgrade install command.""" return HelmUpgradeInstallFlags(**self.helm_flags.dict()) @override @@ -159,14 +160,14 @@ def destroy(self, dry_run: bool) -> None: log.info(magentaify(stdout)) def to_helm_values(self) -> dict: - """Generate a dictionary of values readable by Helm from `self.app` + """Generate a dictionary of values readable by Helm from `self.app`. :returns: Thte values to be used by Helm """ return self.app.dict(by_alias=True, exclude_none=True, exclude_defaults=True) def print_helm_diff(self, stdout: str) -> None: - """Print the diff of the last and current release of this component + """Print the diff of the last and current release of this component. :param stdout: The output of a Helm command that installs or upgrades the release """ @@ -187,13 +188,14 @@ def _validate_custom(self, **kwargs) -> None: @staticmethod def validate_kubernetes_name(name: str) -> None: - """Check if a name is valid for a Kubernetes resource + """Check if a name is valid for a Kubernetes resource. :param name: Name that is to be used for the resource :raises ValueError: The component name {name} is invalid for Kubernetes. """ if not bool(KUBERNETES_NAME_CHECK_PATTERN.match(name)): - raise ValueError(f"The component name {name} is invalid for Kubernetes.") + msg = f"The component name {name} is invalid for Kubernetes." + raise ValueError(msg) @override def dict(self, *, exclude=None, **kwargs) -> dict[str, Any]: diff --git a/kpops/components/base_components/models/from_section.py b/kpops/components/base_components/models/from_section.py index a3188a17b..153133639 100644 --- a/kpops/components/base_components/models/from_section.py +++ b/kpops/components/base_components/models/from_section.py @@ -9,7 +9,7 @@ class InputTopicTypes(str, Enum): - """Input topic types + """Input topic types. INPUT (input topic), PATTERN (extra-topic-pattern or input-topic-pattern) """ @@ -19,7 +19,7 @@ class InputTopicTypes(str, Enum): class FromTopic(BaseModel): - """Input topic + """Input topic. :param type: Topic type, defaults to None :param role: Custom identifier belonging to a topic; @@ -37,9 +37,10 @@ class Config(DescConfig): @root_validator def extra_topic_role(cls, values: dict[str, Any]) -> dict[str, Any]: - """Ensure that cls.role is used correctly, assign type if needed""" + """Ensure that cls.role is used correctly, assign type if needed.""" if values["type"] == InputTopicTypes.INPUT and values["role"]: - raise ValueError("Define role only if `type` is `pattern` or `None`") + msg = "Define role only if `type` is `pattern` or `None`" + raise ValueError(msg) return values @@ -47,7 +48,7 @@ def extra_topic_role(cls, values: dict[str, Any]) -> dict[str, Any]: class FromSection(BaseModel): - """Holds multiple input topics + """Holds multiple input topics. :param topics: Input topics :param components: Components to read from diff --git a/kpops/components/base_components/models/to_section.py b/kpops/components/base_components/models/to_section.py index cbad0987a..03f1d7141 100644 --- a/kpops/components/base_components/models/to_section.py +++ b/kpops/components/base_components/models/to_section.py @@ -9,7 +9,7 @@ class OutputTopicTypes(str, Enum): - """Types of output topic + """Types of output topic. OUTPUT (output topic), ERROR (error topic) """ @@ -19,7 +19,7 @@ class OutputTopicTypes(str, Enum): class TopicConfig(BaseModel): - """Configure an output topic + """Configure an output topic. :param type: Topic type :param key_schema: Key schema class name @@ -65,14 +65,15 @@ class Config(DescConfig): @root_validator def extra_topic_role(cls, values: dict[str, Any]) -> dict[str, Any]: - """Ensure that cls.role is used correctly, assign type if needed""" + """Ensure that cls.role is used correctly, assign type if needed.""" if values["type"] and values["role"]: - raise ValueError("Define `role` only if `type` is undefined") + msg = "Define `role` only if `type` is undefined" + raise ValueError(msg) return values class ToSection(BaseModel): - """Holds multiple output topics + """Holds multiple output topics. :param topics: Output topics :param models: Data models diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index cf4bafa52..d05d4d4c1 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -22,7 +22,7 @@ class PipelineComponent(BaseDefaultsComponent, ABC): - """Base class for all components + """Base class for all components. :param name: Component name :param prefix: Pipeline prefix that will prefix every component name. @@ -76,39 +76,39 @@ def add_extra_input_topics(self, role: str, topics: list[str]) -> None: """ def set_input_pattern(self, name: str) -> None: - """Set input pattern + """Set input pattern. :param name: Input pattern name """ def add_extra_input_pattern(self, role: str, topic: str) -> None: - """Add an input pattern of type extra + """Add an input pattern of type extra. :param role: Custom identifier belonging to one or multiple topics :param topic: Topic name """ def set_output_topic(self, topic_name: str) -> None: - """Set output topic + """Set output topic. :param topic_name: Output topic name """ def set_error_topic(self, topic_name: str) -> None: - """Set error topic + """Set error topic. :param topic_name: Error topic name """ def add_extra_output_topic(self, topic_name: str, role: str) -> None: - """Add an output topic of type extra + """Add an output topic of type extra. :param topic_name: Output topic name :param role: Role that is unique to the extra output topic """ def set_input_topics(self) -> None: - """Put values of config.from into the streams config section of streams bootstrap + """Put values of config.from into the streams config section of streams bootstrap. Supports extra_input_topics (topics by role) or input_topics. """ @@ -117,7 +117,7 @@ def set_input_topics(self) -> None: self.apply_from_inputs(name, topic) def apply_from_inputs(self, name: str, topic: FromTopic) -> None: - """Add a `from` section input to the component config + """Add a `from` section input to the component config. :param name: Name of the field :param topic: Value of the field @@ -133,7 +133,7 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> None: self.add_input_topics([name]) def set_output_topics(self) -> None: - """Put values of config.to into the producer config section of streams bootstrap + """Put values of config.to into the producer config section of streams bootstrap. Supports extra_output_topics (topics by role) or output_topics. """ @@ -142,7 +142,7 @@ def set_output_topics(self) -> None: self.apply_to_outputs(name, topic) def apply_to_outputs(self, name: str, topic: TopicConfig) -> None: - """Add a `to` section input to the component config + """Add a `to` section input to the component config. :param name: Name of the field :param topic: Value of the field @@ -158,12 +158,14 @@ def apply_to_outputs(self, name: str, topic: TopicConfig) -> None: def weave_from_topics( self, to: ToSection | None, - from_topic: FromTopic = FromTopic(type=InputTopicTypes.INPUT), + from_topic: FromTopic | None = None, ) -> None: - """Weave output topics of upstream component or from component into config + """Weave output topics of upstream component or from component into config. Override this method to apply custom logic """ + if from_topic is None: + from_topic = FromTopic(type=InputTopicTypes.INPUT) if not to: return input_topics = [ @@ -175,7 +177,7 @@ def weave_from_topics( self.apply_from_inputs(input_topic, from_topic) def inflate(self) -> list[PipelineComponent]: - """Inflate a component + """Inflate a component. This is helpful if one component should result in multiple components. To support this, override this method and return a list of components @@ -185,8 +187,7 @@ def inflate(self) -> list[PipelineComponent]: return [self] def template(self) -> None: - """ - Runs `helm template` + """Run `helm template`. From HELM: Render chart templates locally and display the output. Any values that would normally be looked up or retrieved in-cluster will @@ -195,25 +196,25 @@ def template(self) -> None: """ def deploy(self, dry_run: bool) -> None: - """Deploy the component (self) to the k8s cluster + """Deploy the component (self) to the k8s cluster. :param dry_run: Whether to do a dry run of the command """ def destroy(self, dry_run: bool) -> None: - """Uninstall the component (self) from the k8s cluster + """Uninstall the component (self) from the k8s cluster. :param dry_run: Whether to do a dry run of the command """ def reset(self, dry_run: bool) -> None: - """Reset component (self) state + """Reset component (self) state. :param dry_run: Whether to do a dry run of the command """ def clean(self, dry_run: bool) -> None: - """Remove component (self) and any trace of it + """Remove component (self) and any trace of it. :param dry_run: Whether to do a dry run of the command """ diff --git a/kpops/components/streams_bootstrap/producer/model.py b/kpops/components/streams_bootstrap/producer/model.py index 3c4ae6e46..8af1a68c6 100644 --- a/kpops/components/streams_bootstrap/producer/model.py +++ b/kpops/components/streams_bootstrap/producer/model.py @@ -8,7 +8,7 @@ class ProducerStreamsConfig(KafkaStreamsConfig): - """Kafka Streams settings specific to Producer + """Kafka Streams settings specific to Producer. :param extra_output_topics: Extra output topics :param output_topic: Output topic, defaults to None @@ -23,7 +23,7 @@ class ProducerStreamsConfig(KafkaStreamsConfig): class ProducerValues(KafkaAppConfig): - """Settings specific to producers + """Settings specific to producers. :param streams: Kafka Streams settings """ diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 022ff3e5e..6091cdd77 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -1,4 +1,4 @@ -from __future__ import annotations +# from __future__ import annotations from pydantic import Field from typing_extensions import override @@ -14,7 +14,7 @@ class ProducerApp(KafkaApp): - """Producer component + """Producer component. This producer holds configuration to use as values for the streams bootstrap producer helm chart. @@ -40,7 +40,8 @@ class ProducerApp(KafkaApp): def apply_to_outputs(self, name: str, topic: TopicConfig) -> None: match topic.type: case OutputTopicTypes.ERROR: - raise ValueError("Producer apps do not support error topics") + msg = "Producer apps do not support error topics" + raise ValueError(msg) case _: super().apply_to_outputs(name, topic) diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index aabbe8237..ca2db77ae 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -1,4 +1,5 @@ -from typing import AbstractSet, Any, Mapping +from collections.abc import Mapping, Set +from typing import Any from pydantic import BaseConfig, BaseModel, Extra, Field from typing_extensions import override @@ -13,7 +14,7 @@ class StreamsConfig(KafkaStreamsConfig): - """Streams Bootstrap streams section + """Streams Bootstrap streams section. :param input_topics: Input topics, defaults to [] :param input_pattern: Input pattern, defaults to None @@ -75,14 +76,14 @@ def add_extra_input_topics(self, role: str, topics: list[str]) -> None: def dict( self, *, - include: None | AbstractSet[int | str] | Mapping[int | str, Any] = None, - exclude: None | AbstractSet[int | str] | Mapping[int | str, Any] = None, + include: None | Set[int | str] | Mapping[int | str, Any] = None, + exclude: None | Set[int | str] | Mapping[int | str, Any] = None, by_alias: bool = False, skip_defaults: bool | None = None, exclude_unset: bool = False, **kwargs, ) -> dict: - """Generate a dictionary representation of the model + """Generate a dictionary representation of the model. Optionally, specify which fields to include or exclude. @@ -105,7 +106,7 @@ def dict( class StreamsAppAutoScaling(BaseModel): - """Kubernetes Event-driven Autoscaling config + """Kubernetes Event-driven Autoscaling config. :param enabled: Whether to enable auto-scaling using KEDA., defaults to False :param consumer_group: Name of the consumer group used for checking the diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 956980ff7..a466b4eba 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -1,5 +1,3 @@ -from __future__ import annotations - from pydantic import Field from typing_extensions import override @@ -10,7 +8,7 @@ class StreamsApp(KafkaApp): - """StreamsApp component that configures a streams bootstrap app + """StreamsApp component that configures a streams bootstrap app. :param app: Application-specific settings """ @@ -67,7 +65,7 @@ def clean(self, dry_run: bool) -> None: self.__run_streams_clean_up_job(dry_run, delete_output=True) def __run_streams_clean_up_job(self, dry_run: bool, delete_output: bool) -> None: - """Run clean job for this Streams app + """Run clean job for this Streams app. :param dry_run: Whether to do a dry run of the command :param delete_output: Whether to delete the output of the app that is being cleaned diff --git a/kpops/pipeline_generator/pipeline.py b/kpops/pipeline_generator/pipeline.py index 093a452ea..920eec202 100644 --- a/kpops/pipeline_generator/pipeline.py +++ b/kpops/pipeline_generator/pipeline.py @@ -3,23 +3,27 @@ import json import logging from collections import Counter -from collections.abc import Iterator from contextlib import suppress -from pathlib import Path +from typing import TYPE_CHECKING import yaml from pydantic import BaseModel from rich.console import Console from rich.syntax import Syntax -from kpops.cli.pipeline_config import PipelineConfig -from kpops.cli.registry import Registry -from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.utils.dict_ops import generate_substitution, update_nested_pair from kpops.utils.environment import ENV from kpops.utils.yaml_loading import load_yaml_file, substitute, substitute_nested +if TYPE_CHECKING: + from collections.abc import Iterator + from pathlib import Path + + from kpops.cli.pipeline_config import PipelineConfig + from kpops.cli.registry import Registry + from kpops.component_handlers import ComponentHandlers + log = logging.getLogger("PipelineGenerator") @@ -32,7 +36,7 @@ class ValidationError(Exception): class PipelineComponents(BaseModel): - """Stores the pipeline components""" + """Stores the pipeline components.""" components: list[PipelineComponent] = [] @@ -44,7 +48,8 @@ def find(self, component_name: str) -> PipelineComponent: for component in self.components: if component_name == component.name: return component - raise ValueError(f"Component {component_name} not found") + msg = f"Component {component_name} not found" + raise ValueError(msg) def add(self, component: PipelineComponent) -> None: self._populate_component_name(component) @@ -63,9 +68,8 @@ def validate_unique_names(self) -> None: step_names = [component.full_name for component in self.components] duplicates = [name for name, count in Counter(step_names).items() if count > 1] if duplicates: - raise ValidationError( - f"step names should be unique. duplicate step names: {', '.join(duplicates)}" - ) + msg = f"step names should be unique. duplicate step names: {', '.join(duplicates)}" + raise ValidationError(msg) @staticmethod def _populate_component_name(component: PipelineComponent) -> None: # TODO: remove @@ -79,7 +83,7 @@ def _populate_component_name(component: PipelineComponent) -> None: # TODO: rem def create_env_components_index( environment_components: list[dict], ) -> dict[str, dict]: - """Create an index for all registered components in the project + """Create an index for all registered components in the project. :param environment_components: List of all components to be included :return: component index @@ -87,9 +91,8 @@ def create_env_components_index( index: dict[str, dict] = {} for component in environment_components: if "type" not in component or "name" not in component: - raise ValueError( - "To override components per environment, every component should at least have a type and a name." - ) + msg = "To override components per environment, every component should at least have a type and a name." + raise ValueError(msg) index[component["name"]] = component return index @@ -120,7 +123,7 @@ def load_from_yaml( config: PipelineConfig, handlers: ComponentHandlers, ) -> Pipeline: - """Load pipeline definition from yaml + """Load pipeline definition from yaml. The file is often named ``pipeline.yaml`` @@ -137,22 +140,19 @@ def load_from_yaml( main_content = load_yaml_file(path, substitution=ENV) if not isinstance(main_content, list): - raise TypeError( - f"The pipeline definition {path} should contain a list of components" - ) + msg = f"The pipeline definition {path} should contain a list of components" + raise TypeError(msg) env_content = [] if (env_file := Pipeline.pipeline_filename_environment(path, config)).exists(): env_content = load_yaml_file(env_file, substitution=ENV) if not isinstance(env_content, list): - raise TypeError( - f"The pipeline definition {env_file} should contain a list of components" - ) + msg = f"The pipeline definition {env_file} should contain a list of components" + raise TypeError(msg) - pipeline = cls(main_content, env_content, registry, config, handlers) - return pipeline + return cls(main_content, env_content, registry, config, handlers) def parse_components(self, component_list: list[dict]) -> None: - """Instantiate, enrich and inflate a list of components + """Instantiate, enrich and inflate a list of components. :param component_list: List of components :raises ValueError: Every component must have a type defined @@ -163,19 +163,17 @@ def parse_components(self, component_list: list[dict]) -> None: try: try: component_type: str = component_data["type"] - except KeyError: - raise ValueError( - "Every component must have a type defined, this component does not have one." - ) + except KeyError as ke: + msg = "Every component must have a type defined, this component does not have one." + raise ValueError(msg) from ke component_class = self.registry[component_type] self.apply_component(component_class, component_data) - except Exception as ex: + except Exception as ex: # noqa: BLE001 if "name" in component_data: - raise ParsingException( - f"Error enriching {component_data['type']} component {component_data['name']}" - ) from ex + msg = f"Error enriching {component_data['type']} component {component_data['name']}" + raise ParsingException(msg) from ex else: - raise ParsingException() from ex + raise ParsingException from ex def apply_component( self, component_class: type[PipelineComponent], component_data: dict @@ -224,7 +222,7 @@ def enrich_component( self, component: PipelineComponent, ) -> PipelineComponent: - """Enrich a pipeline component with env-specific config and substitute variables + """Enrich a pipeline component with env-specific config and substitute variables. :param component: Component to be enriched :returns: Enriched component @@ -249,7 +247,7 @@ def enrich_component( ) def print_yaml(self, substitution: dict | None = None) -> None: - """Print the generated pipeline definition + """Print the generated pipeline definition. :param substitution: Substitution dictionary, defaults to None """ @@ -277,7 +275,7 @@ def __len__(self) -> int: return len(self.components) def substitute_in_component(self, component_as_dict: dict) -> dict: - """Substitute all $-placeholders in a component in dict representation + """Substitute all $-placeholders in a component in dict representation. :param component_as_dict: Component represented as dict :return: Updated component @@ -311,7 +309,7 @@ def validate(self) -> None: @staticmethod def pipeline_filename_environment(path: Path, config: PipelineConfig) -> Path: - """Add the environment name from the PipelineConfig to the pipeline.yaml path + """Add the environment name from the PipelineConfig to the pipeline.yaml path. :param path: Path to pipeline.yaml file :param config: The PipelineConfig @@ -336,7 +334,8 @@ def set_pipeline_name_env_vars(base_dir: Path, path: Path) -> None: """ path_without_file = path.resolve().relative_to(base_dir.resolve()).parts[:-1] if not path_without_file: - raise ValueError("The pipeline-base-dir should not equal the pipeline-path") + msg = "The pipeline-base-dir should not equal the pipeline-path" + raise ValueError(msg) pipeline_name = "-".join(path_without_file) ENV["pipeline_name"] = pipeline_name for level, parent in enumerate(path_without_file): diff --git a/kpops/utils/dict_differ.py b/kpops/utils/dict_differ.py index 2cdaa95b0..934924e21 100644 --- a/kpops/utils/dict_differ.py +++ b/kpops/utils/dict_differ.py @@ -3,12 +3,15 @@ from dataclasses import dataclass from difflib import Differ from enum import Enum -from typing import Generic, Iterable, Iterator, Sequence, TypeVar +from typing import TYPE_CHECKING, Generic, TypeVar import typer import yaml from dictdiffer import diff, patch +if TYPE_CHECKING: + from collections.abc import Iterable, Iterator, Sequence + differ = Differ() @@ -39,7 +42,8 @@ def factory(type: DiffType, change: T | tuple[T, T]) -> Change: return Change(change, None) case DiffType.CHANGE if isinstance(change, tuple): return Change(*change) - raise ValueError(f"{type} is not part of {DiffType}") + msg = f"{type} is not part of {DiffType}" + raise ValueError(msg) @dataclass @@ -53,9 +57,9 @@ def from_dicts( d1: dict, d2: dict, ignore: set[str] | None = None ) -> Iterator[Diff]: for diff_type, keys, changes in diff(d1, d2, ignore=ignore): - if not isinstance(changes, list): - changes = [("", changes)] - for key, change in changes: + if not isinstance(changes_tmp := changes, list): + changes_tmp = [("", changes)] + for key, change in changes_tmp: yield Diff( DiffType.from_str(diff_type), Diff.__find_changed_key(keys, key), @@ -64,9 +68,7 @@ def from_dicts( @staticmethod def __find_changed_key(key_1: list[str] | str, key_2: str = "") -> str: - """ - Generates a string that points to the changed key in the dictionary. - """ + """Generate a string that points to the changed key in the dictionary.""" if isinstance(key_1, list) and len(key_1) > 1: return f"{key_1[0]}[{key_1[1]}]" if not key_1: diff --git a/kpops/utils/dict_ops.py b/kpops/utils/dict_ops.py index 64e88a89b..14cc849e3 100644 --- a/kpops/utils/dict_ops.py +++ b/kpops/utils/dict_ops.py @@ -1,8 +1,9 @@ -from typing import Any, Mapping +from collections.abc import Mapping +from typing import Any def update_nested_pair(original_dict: dict, other_dict: Mapping) -> dict: - """Nested update for 2 dictionaries + """Nested update for 2 dictionaries. Adds all new fields in ``other_dict`` to ``original_dict``. Does not update existing fields. @@ -19,9 +20,8 @@ def update_nested_pair(original_dict: dict, other_dict: Mapping) -> dict: nested_val = original_dict.get(key, {}) if isinstance(nested_val, dict): original_dict[key] = update_nested_pair(nested_val, value) - else: - if key not in original_dict: - original_dict[key] = value + elif key not in original_dict: + original_dict[key] = value return original_dict @@ -48,7 +48,7 @@ def update_nested(*argv: dict) -> dict: def flatten_mapping( nested_mapping: Mapping[str, Any], prefix: str | None = None, separator: str = "_" ) -> dict[str, Any]: - """Flattens a Mapping + """Flattens a Mapping. :param nested_mapping: Nested mapping that is to be flattened :param prefix: Prefix that will be applied to all top-level keys in the output., defaults to None @@ -56,11 +56,13 @@ def flatten_mapping( :returns: "Flattened" mapping in the form of dict """ if not isinstance(nested_mapping, Mapping): - raise TypeError("Argument nested_mapping is not a Mapping") + msg = "Argument nested_mapping is not a Mapping" + raise TypeError(msg) top: dict[str, Any] = {} for key, value in nested_mapping.items(): if not isinstance(key, str): - raise TypeError(f"Argument nested_mapping contains a non-str key: {key}") + msg = f"Argument nested_mapping contains a non-str key: {key}" + raise TypeError(msg) if prefix: key = prefix + separator + key if isinstance(value, Mapping): @@ -76,7 +78,7 @@ def generate_substitution( prefix: str | None = None, existing_substitution: dict | None = None, ) -> dict: - """Generate a complete substitution dict from a given dict + """Generate a complete substitution dict from a given dict. Finds all attributes that belong to a model and expands them to create a dict containing each variable name and value to substitute with. diff --git a/kpops/utils/docstring.py b/kpops/utils/docstring.py index fc6f4c61d..d5ca287d3 100644 --- a/kpops/utils/docstring.py +++ b/kpops/utils/docstring.py @@ -4,7 +4,7 @@ def describe_attr(name: str, docstr: str | None) -> str: - """Read attribute description from class docstring + """Read attribute description from class docstring. **Works only with reStructuredText docstrings.** @@ -19,7 +19,7 @@ def describe_attr(name: str, docstr: str | None) -> str: def describe_object(docstr: str | None) -> str: - """Return description from an object's docstring + """Return description from an object's docstring. Excludes parameters and return definitions @@ -44,7 +44,7 @@ def describe_object(docstr: str | None) -> str: def _trim_description_end(desc: str) -> str: - """Remove the unwanted text that comes after a description in a docstring + """Remove the unwanted text that comes after a description in a docstring. Also removes all whitespaces and newlines and replaces them with a single space. diff --git a/kpops/utils/environment.py b/kpops/utils/environment.py index c46f83611..0ed7ae920 100644 --- a/kpops/utils/environment.py +++ b/kpops/utils/environment.py @@ -1,7 +1,7 @@ import os import platform from collections import UserDict -from typing import Callable +from collections.abc import Callable class Environment(UserDict): diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index 470a1412d..7cad9422d 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -1,8 +1,9 @@ import inspect import logging from abc import ABC +from collections.abc import Sequence from enum import Enum -from typing import Annotated, Any, Literal, Sequence, Union +from typing import Annotated, Any, Literal, Union from pydantic import BaseConfig, Field, schema, schema_json_of from pydantic.fields import FieldInfo, ModelField @@ -25,7 +26,8 @@ class SchemaScope(str, Enum): # adapted from https://github.com/tiangolo/fastapi/issues/1378#issuecomment-764966955 def field_schema(field: ModelField, **kwargs: Any) -> Any: if field.field_info.extra.get("hidden_from_schema"): - raise SkipField(f"{field.name} field is being hidden") + msg = f"{field.name} field is being hidden" + raise SkipField(msg) else: return original_field_schema(field, **kwargs) @@ -38,8 +40,7 @@ def field_schema(field: ModelField, **kwargs: Any) -> Any: def _is_valid_component( defined_component_types: set[str], component: type[PipelineComponent] ) -> bool: - """ - Check whether a PipelineComponent subclass has a valid definition for the schema generation. + """Check whether a PipelineComponent subclass has a valid definition for the schema generation. :param defined_component_types: types defined so far :param component: component type to be validated @@ -58,7 +59,7 @@ def _is_valid_component( def _add_components( components_module: str, components: tuple[type[PipelineComponent]] | None = None ) -> tuple[type[PipelineComponent]]: - """Add components to a components tuple + """Add components to a components tuple. If an empty tuple is provided or it is not provided at all, the components types from the given module are 'tupled' @@ -69,7 +70,7 @@ def _add_components( :return: Extended tuple """ if components is None: - components = tuple() + components = tuple() # noqa: C408 # Set of existing types, against which to check the new ones defined_component_types = {component.type for component in components} custom_components = ( @@ -95,14 +96,15 @@ def gen_pipeline_schema( log.warning("No components are provided, no schema is generated.") return # Add stock components if enabled - components: tuple[type[PipelineComponent]] = tuple() + components: tuple[type[PipelineComponent]] = tuple() # noqa: C408 if include_stock_components: components = _add_components("kpops.components") # Add custom components if provided if components_module: components = _add_components(components_module, components) if not components: - raise RuntimeError("No valid components found.") + msg = "No valid components found." + raise RuntimeError(msg) # Create a type union that will hold the union of all component types PipelineComponents = Union[components] # type: ignore[valid-type] @@ -110,7 +112,7 @@ def gen_pipeline_schema( for component in components: component.__fields__["type"] = ModelField( name="type", - type_=Literal[component.type], # type: ignore + type_=Literal[component.type], # type: ignore[reportGeneralTypeIssues] required=False, default=component.type, final=True, @@ -137,7 +139,7 @@ def gen_pipeline_schema( def gen_config_schema() -> None: - """Generate a json schema from the model of pipeline config""" + """Generate a json schema from the model of pipeline config.""" schema = schema_json_of( PipelineConfig, title="KPOps config schema", indent=4, sort_keys=True ) diff --git a/kpops/utils/yaml_loading.py b/kpops/utils/yaml_loading.py index cb9536200..fb810c193 100644 --- a/kpops/utils/yaml_loading.py +++ b/kpops/utils/yaml_loading.py @@ -20,7 +20,7 @@ def generate_hashkey( def load_yaml_file( file_path: Path, *, substitution: Mapping[str, Any] | None = None ) -> dict | list[dict]: - with open(file_path) as yaml_file: + with file_path.open() as yaml_file: return yaml.load(substitute(yaml_file.read(), substitution), Loader=yaml.Loader) @@ -70,7 +70,6 @@ def substitute_nested(input: str, **kwargs) -> str: steps.add(new_str) old_str, new_str = new_str, substitute(new_str, kwargs) if new_str != old_str: - raise ValueError( - "An infinite loop condition detected. Check substitution variables." - ) + msg = "An infinite loop condition detected. Check substitution variables." + raise ValueError(msg) return old_str diff --git a/poetry.lock b/poetry.lock index 339e37345..9a50b9ae1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "aiofiles" @@ -349,22 +349,6 @@ files = [ docs = ["furo (>=2022.9.29)", "sphinx (>=5.3)", "sphinx-autodoc-typehints (>=1.19.5)"] testing = ["covdefaults (>=2.2.2)", "coverage (>=6.5)", "pytest (>=7.2)", "pytest-cov (>=4)", "pytest-timeout (>=2.1)"] -[[package]] -name = "flake8" -version = "4.0.1" -description = "the modular source code checker: pep8 pyflakes and co" -optional = false -python-versions = ">=3.6" -files = [ - {file = "flake8-4.0.1-py2.py3-none-any.whl", hash = "sha256:479b1304f72536a55948cb40a32dce8bb0ffe3501e26eaf292c7e60eb5e0428d"}, - {file = "flake8-4.0.1.tar.gz", hash = "sha256:806e034dda44114815e23c16ef92f95c91e4c71100ff52813adf7132a6ad870d"}, -] - -[package.dependencies] -mccabe = ">=0.6.0,<0.7.0" -pycodestyle = ">=2.8.0,<2.9.0" -pyflakes = ">=2.4.0,<2.5.0" - [[package]] name = "ghp-import" version = "2.1.0" @@ -473,23 +457,6 @@ files = [ {file = "iniconfig-1.1.1.tar.gz", hash = "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32"}, ] -[[package]] -name = "isort" -version = "5.12.0" -description = "A Python utility / library to sort Python imports." -optional = false -python-versions = ">=3.8.0" -files = [ - {file = "isort-5.12.0-py3-none-any.whl", hash = "sha256:f84c2818376e66cf843d497486ea8fed8700b340f308f076c6fb1229dff318b6"}, - {file = "isort-5.12.0.tar.gz", hash = "sha256:8bef7dde241278824a6d83f44a544709b065191b95b6e50894bdc722fcba0504"}, -] - -[package.extras] -colors = ["colorama (>=0.4.3)"] -pipfile-deprecated-finder = ["pip-shims (>=0.5.2)", "pipreqs", "requirementslib"] -plugins = ["setuptools"] -requirements-deprecated-finder = ["pip-api", "pipreqs"] - [[package]] name = "jinja2" version = "3.1.2" @@ -606,17 +573,6 @@ chardet = ">=3.0.4,<6" [package.extras] test = ["Faker (>=1.0.2)", "pytest (>=6.0.1)", "pytest-md-report (>=0.1)"] -[[package]] -name = "mccabe" -version = "0.6.1" -description = "McCabe checker, plugin for flake8" -optional = false -python-versions = "*" -files = [ - {file = "mccabe-0.6.1-py2.py3-none-any.whl", hash = "sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42"}, - {file = "mccabe-0.6.1.tar.gz", hash = "sha256:dd8d182285a0fe56bace7f45b5e7d1a6ebcbf524e8f3bd87eb0f125271b8831f"}, -] - [[package]] name = "mergedeep" version = "1.3.4" @@ -869,17 +825,6 @@ pyyaml = ">=5.1" toml = "*" virtualenv = ">=20.0.8" -[[package]] -name = "pycodestyle" -version = "2.8.0" -description = "Python style guide checker" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" -files = [ - {file = "pycodestyle-2.8.0-py2.py3-none-any.whl", hash = "sha256:720f8b39dde8b293825e7ff02c475f3077124006db4f440dcbc9a20b76548a20"}, - {file = "pycodestyle-2.8.0.tar.gz", hash = "sha256:eddd5847ef438ea1c7870ca7eb78a9d47ce0cdb4851a5523949f2601d0cbbe7f"}, -] - [[package]] name = "pydantic" version = "1.10.8" @@ -933,17 +878,6 @@ typing-extensions = ">=4.2.0" dotenv = ["python-dotenv (>=0.10.4)"] email = ["email-validator (>=1.0.3)"] -[[package]] -name = "pyflakes" -version = "2.4.0" -description = "passive checker of Python programs" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" -files = [ - {file = "pyflakes-2.4.0-py2.py3-none-any.whl", hash = "sha256:3bb3a3f256f4b7968c9c788781e4ff07dce46bdf12339dcda61053375426ee2e"}, - {file = "pyflakes-2.4.0.tar.gz", hash = "sha256:05a85c2872edf37a4ed30b0cce2f6093e1d0581f8c19d7393122da7e25b2b24c"}, -] - [[package]] name = "pygments" version = "2.14.0" @@ -1449,6 +1383,32 @@ pygments = ">=2.6.0,<3.0.0" [package.extras] jupyter = ["ipywidgets (>=7.5.1,<8.0.0)"] +[[package]] +name = "ruff" +version = "0.0.292" +description = "An extremely fast Python linter, written in Rust." +optional = false +python-versions = ">=3.7" +files = [ + {file = "ruff-0.0.292-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:02f29db018c9d474270c704e6c6b13b18ed0ecac82761e4fcf0faa3728430c96"}, + {file = "ruff-0.0.292-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:69654e564342f507edfa09ee6897883ca76e331d4bbc3676d8a8403838e9fade"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6c3c91859a9b845c33778f11902e7b26440d64b9d5110edd4e4fa1726c41e0a4"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:f4476f1243af2d8c29da5f235c13dca52177117935e1f9393f9d90f9833f69e4"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:be8eb50eaf8648070b8e58ece8e69c9322d34afe367eec4210fdee9a555e4ca7"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:9889bac18a0c07018aac75ef6c1e6511d8411724d67cb879103b01758e110a81"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6bdfabd4334684a4418b99b3118793f2c13bb67bf1540a769d7816410402a205"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:aa7c77c53bfcd75dbcd4d1f42d6cabf2485d2e1ee0678da850f08e1ab13081a8"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8e087b24d0d849c5c81516ec740bf4fd48bf363cfb104545464e0fca749b6af9"}, + {file = "ruff-0.0.292-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:f160b5ec26be32362d0774964e218f3fcf0a7da299f7e220ef45ae9e3e67101a"}, + {file = "ruff-0.0.292-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ac153eee6dd4444501c4bb92bff866491d4bfb01ce26dd2fff7ca472c8df9ad0"}, + {file = "ruff-0.0.292-py3-none-musllinux_1_2_i686.whl", hash = "sha256:87616771e72820800b8faea82edd858324b29bb99a920d6aa3d3949dd3f88fb0"}, + {file = "ruff-0.0.292-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:b76deb3bdbea2ef97db286cf953488745dd6424c122d275f05836c53f62d4016"}, + {file = "ruff-0.0.292-py3-none-win32.whl", hash = "sha256:e854b05408f7a8033a027e4b1c7f9889563dd2aca545d13d06711e5c39c3d003"}, + {file = "ruff-0.0.292-py3-none-win_amd64.whl", hash = "sha256:f27282bedfd04d4c3492e5c3398360c9d86a295be00eccc63914438b4ac8a83c"}, + {file = "ruff-0.0.292-py3-none-win_arm64.whl", hash = "sha256:7f67a69c8f12fbc8daf6ae6d36705037bde315abf8b82b6e1f4c9e74eb750f68"}, + {file = "ruff-0.0.292.tar.gz", hash = "sha256:1093449e37dd1e9b813798f6ad70932b57cf614e5c2b5c51005bf67d55db33ac"}, +] + [[package]] name = "setuptools" version = "65.6.3" @@ -1809,4 +1769,4 @@ watchmedo = ["PyYAML (>=3.10)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "a97ca69785c7aeb0e04fad209cfd5694c54b5993faca02540679072c7179a875" +content-hash = "056b014fc985bda3ac9d33518eae39b228f776e84307ee4ddd28bd330a1c36e6" diff --git a/pyproject.toml b/pyproject.toml index 4d8a34e1e..7e0427eda 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,9 +45,8 @@ pytest-mock = "^3.10.0" pytest-timeout = "^2.1.0" snapshottest = "^0.6.0" pre-commit = "^2.19.0" -flake8 = "^4.0.1" +ruff = "^0.0.292" black = "^23.7.0" -isort = "^5.12.0" typer-cli = "^0.0.13" pyright = "^1.1.314" pytest-rerunfailures = "^11.1.2" @@ -67,6 +66,101 @@ mike = "^1.1.2" [tool.poetry_bumpversion.file."kpops/__init__.py"] +[tool.ruff] +ignore = [ + # "E203", # whitespace before ':' -- Not PEP8 compliant, black won't correct it, add when out of nursery + "E501", # Line too long -- Clashes with `black` + "D1", # Missing docstring for {} -- Inconvenient to enforce +# The following "D" rules do not correspond to our coding style. We use the pep257 convention, but +# "D212" should not be ignored. In ruff (0.0.291) we cannot select a rule that is excluded by specifying +# a convention, hence our only option is to manually replicate it. + "D203", # 1 blank line required before class docstring + "D213", # Multi-line docstring summary should start at the second line + "D214", # Section is over-indented ("{name}") + "D215", # Section underline is over-indented ("{name}") + "D404", # First word of the docstring should not be "This" + "D405", # Section name should be properly capitalized ("{name}") + "D406", # Section name should end with a newline ("{name}") + "D407", # Missing dashed underline after section ("{name}") + "D408", # Section underline should be in the line following the section's name ("{name}") + "D409", # Section underline should match the length of its name ("{name}") + "D410", # Missing blank line after section ("{name}") + "D411", # Missing blank line before section ("{name}") + "D413", # Missing blank line after last section ("{name}") + "D415", # First line should end with a period, question mark, or exclamation point + "D416", # Section name should end with a colon ("{name}") + "D417", # Missing argument description in the docstring for {definition}: {name} + "B009", # Do not call getattr with a constant attribute value. -- Not always applicable + "B010", # Do not call setattr with a constant attribute value. -- Not always applicable + "RUF012", # type class attrs with `ClassVar` -- Too strict/trigger-happy + "UP007", # Use X | Y for type annotations -- `typer` doesn't support it + "COM812", # Checks for the absence of trailing commas -- leads to undesirable behavior from formatters + "PIE804", # Unnecessary `dict` kwargs -- Inconvenient to enforce + "RET505", # Unnecessary {branch} after return statement -- Lots of false positives + "RET506", # Unnecessary {branch} after raise statement -- Lots of false positives + "RET507", # Unnecessary {branch} after continue statement -- Lots of false positives + "RET508", # Unnecessary {branch} after break statement -- Lots of false positives + "PLR09", # upper bound on number of arguments, functions, etc. -- Inconvenient to enforce + "PLR2004", # Magic value used in comparison, consider replacing {value} with a constant variable -- Inconvenient to enforce + "PLW2901", # `for` loop variable `{var}` overwritten by assignment target -- Inconvenient to enforce + "TRY002", # Create your own exception -- Inconvenient to enforce + "TRY003", # Avoid specifying long messages outside the exception class -- Inconvenient to enforce +] +select = [ + "F", # Pyflakes + "E", # pycodestyle Errors + "W", # pycodestyle Warnings + "C90", # mccabe + "I", # isort + "D", # pydocstyle + "UP", # pyupgrade + "B", # flake8-bugbear + "INP", # flake8-no-pep420 + "RUF", # Ruff-specific rules + "YTT", # flake8-2020 + "ASYNC", # flake8-async + "BLE", # flake8-blind-except + "COM", # flake8-commas + "C4", # flake8-comprehensions + "T10", # flake8-debugger + "EM", # flake8-errmsg + "FA", # flake8-future-annotations + "ISC", # flake8-implicit-str-concat + "ICN", # flake8-import-conventions + "INP", # flake8-no-pep420 + "PIE", # flake8-pie + "PT", # flake8-pytest-style + "Q", # flake8-quotes + "RSE", # flake8-raise + "RET", # flake8-return + "SLOT", # flake8-slots + "SIM", # flake8-simplify + "TCH", # flake8-type-checking, configure correctly and add + "PTH", # flake8-use-pathlib + "PGH", # pygrep-hooks + "PL", # Pylint + "TRY", # tryceratops + # "FURB", # refurb, add when out of nursery + # "LOG", # flake8-logging, add when out of nursery +] +output-format = "grouped" +show-fixes = true +task-tags = ["TODO", "HACK", "FIXME", "XXX"] +target-version = "py310" +exclude = ["tests/*snapshots/*"] + +[tool.ruff.extend-per-file-ignores] +"tests/*/__init__.py" = ["F401"] + +[tool.ruff.isort] +split-on-trailing-comma = false + +[tool.ruff.flake8-bugbear] +extend-immutable-calls = ["typer.Argument"] + +[tool.ruff.flake8-type-checking] +runtime-evaluated-base-classes = ["pydantic.BaseModel"] + [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 89429d3e0..000000000 --- a/setup.cfg +++ /dev/null @@ -1,19 +0,0 @@ -[flake8] -exclude = - .git, - __pycache__ -max-complexity = 10 -# black docs regarding flake8: https://black.readthedocs.io/en/stable/guides/using_black_with_other_tools.html#flake8 -# black enforces an equal amount of whitespace around slice operators. It is not PEP8 compliant. -# black and flake8 also disagree on line length -extend-ignore = - # E203: Whitespace before ':' - E203, - # E501: Line too long - E501, -per-file-ignores = - # F401: unused imports - tests/*/__init__.py: F401 - -[isort] -profile = black diff --git a/tests/cli/snapshots/snap_test_schema_generation.py b/tests/cli/snapshots/snap_test_schema_generation.py index 2a19e65c1..2dd92b512 100644 --- a/tests/cli/snapshots/snap_test_schema_generation.py +++ b/tests/cli/snapshots/snap_test_schema_generation.py @@ -58,7 +58,7 @@ }, "FromSection": { "additionalProperties": false, - "description": "Holds multiple input topics", + "description": "Holds multiple input topics.", "properties": { "components": { "additionalProperties": { @@ -84,7 +84,7 @@ }, "FromTopic": { "additionalProperties": false, - "description": "Input topic", + "description": "Input topic.", "properties": { "role": { "description": "Custom identifier belonging to a topic; define only if `type` is `pattern` or `None`", @@ -104,7 +104,7 @@ "type": "object" }, "InputTopicTypes": { - "description": "Input topic types\\n\\nINPUT (input topic), PATTERN (extra-topic-pattern or input-topic-pattern)", + "description": "Input topic types.\\n\\nINPUT (input topic), PATTERN (extra-topic-pattern or input-topic-pattern)", "enum": [ "input", "pattern" @@ -113,7 +113,7 @@ "type": "string" }, "OutputTopicTypes": { - "description": "Types of output topic\\n\\nOUTPUT (output topic), ERROR (error topic)", + "description": "Types of output topic.\\n\\nOUTPUT (output topic), ERROR (error topic)", "enum": [ "output", "error" @@ -216,7 +216,7 @@ "type": "object" }, "SubPipelineComponentCorrectDocstr": { - "description": "Newline before title is removed\\nSummarry is correctly imported. All whitespaces are removed and replaced with a single space. The description extraction terminates at the correct place, deletes 1 trailing coma", + "description": "Newline before title is removed.\\nSummarry is correctly imported. All whitespaces are removed and replaced with a single space. The description extraction terminates at the correct place, deletes 1 trailing coma", "properties": { "example_attr": { "description": "Parameter description looks correct and it is not included in the class description, terminates here", @@ -254,7 +254,7 @@ }, "type": { "default": "sub-pipeline-component-correct-docstr", - "description": "Newline before title is removed\\nSummarry is correctly imported. All whitespaces are removed and replaced with a single space. The description extraction terminates at the correct place, deletes 1 trailing coma", + "description": "Newline before title is removed.\\nSummarry is correctly imported. All whitespaces are removed and replaced with a single space. The description extraction terminates at the correct place, deletes 1 trailing coma", "enum": [ "sub-pipeline-component-correct-docstr" ], @@ -317,7 +317,7 @@ "type": "object" }, "ToSection": { - "description": "Holds multiple output topics", + "description": "Holds multiple output topics.", "properties": { "models": { "additionalProperties": { @@ -343,7 +343,7 @@ }, "TopicConfig": { "additionalProperties": false, - "description": "Configure an output topic", + "description": "Configure an output topic.", "properties": { "configs": { "additionalProperties": { diff --git a/tests/cli/test_schema_generation.py b/tests/cli/test_schema_generation.py index 6c651dfa4..cbb855d14 100644 --- a/tests/cli/test_schema_generation.py +++ b/tests/cli/test_schema_generation.py @@ -3,16 +3,19 @@ import logging from abc import ABC, abstractmethod from pathlib import Path +from typing import TYPE_CHECKING import pytest from pydantic import Field -from snapshottest.module import SnapshotTest from typer.testing import CliRunner -import tests.cli.resources.empty_module as empty_module from kpops.cli.main import app from kpops.components.base_components import PipelineComponent from kpops.utils.docstring import describe_attr +from tests.cli.resources import empty_module + +if TYPE_CHECKING: + from snapshottest.module import SnapshotTest RESOURCE_PATH = Path(__file__).parent / "resources" @@ -54,8 +57,7 @@ class SubPipelineComponentCorrect(SubPipelineComponent): # Correctly defined, docstr test class SubPipelineComponentCorrectDocstr(SubPipelineComponent): - """ - Newline before title is removed + """Newline before title is removed. Summarry is correctly imported. All @@ -108,7 +110,7 @@ def test_gen_pipeline_schema_no_modules(self, caplog: pytest.LogCaptureFixture): def test_gen_pipeline_schema_no_components(self): with pytest.raises(RuntimeError, match="^No valid components found.$"): - result = runner.invoke( + runner.invoke( app, [ "schema", @@ -118,7 +120,6 @@ def test_gen_pipeline_schema_no_components(self): ], catch_exceptions=False, ) - assert result.exit_code == 1 def test_gen_pipeline_schema_only_stock_module(self): result = runner.invoke( diff --git a/tests/compiler/test_pipeline_name.py b/tests/compiler/test_pipeline_name.py index 7a07c1a12..f0a1b1b1e 100644 --- a/tests/compiler/test_pipeline_name.py +++ b/tests/compiler/test_pipeline_name.py @@ -8,49 +8,51 @@ DEFAULTS_PATH = Path(__file__).parent / "resources" PIPELINE_PATH = Path("./some/random/path/for/testing/pipeline.yaml") -DEFAULT_BASE_DIR = Path(".") +DEFAULT_BASE_DIR = Path() def test_should_set_pipeline_name_with_default_base_dir(): Pipeline.set_pipeline_name_env_vars(DEFAULT_BASE_DIR, PIPELINE_PATH) - assert "some-random-path-for-testing" == ENV["pipeline_name"] - assert "some" == ENV["pipeline_name_0"] - assert "random" == ENV["pipeline_name_1"] - assert "path" == ENV["pipeline_name_2"] - assert "for" == ENV["pipeline_name_3"] - assert "testing" == ENV["pipeline_name_4"] + assert ENV["pipeline_name"] == "some-random-path-for-testing" + assert ENV["pipeline_name_0"] == "some" + assert ENV["pipeline_name_1"] == "random" + assert ENV["pipeline_name_2"] == "path" + assert ENV["pipeline_name_3"] == "for" + assert ENV["pipeline_name_4"] == "testing" def test_should_set_pipeline_name_with_specific_relative_base_dir(): Pipeline.set_pipeline_name_env_vars(Path("./some/random/path"), PIPELINE_PATH) - assert "for-testing" == ENV["pipeline_name"] - assert "for" == ENV["pipeline_name_0"] - assert "testing" == ENV["pipeline_name_1"] + assert ENV["pipeline_name"] == "for-testing" + assert ENV["pipeline_name_0"] == "for" + assert ENV["pipeline_name_1"] == "testing" def test_should_set_pipeline_name_with_specific_absolute_base_dir(): Pipeline.set_pipeline_name_env_vars(Path("some/random/path"), PIPELINE_PATH) - assert "for-testing" == ENV["pipeline_name"] - assert "for" == ENV["pipeline_name_0"] - assert "testing" == ENV["pipeline_name_1"] + assert ENV["pipeline_name"] == "for-testing" + assert ENV["pipeline_name_0"] == "for" + assert ENV["pipeline_name_1"] == "testing" def test_should_set_pipeline_name_with_absolute_base_dir(): Pipeline.set_pipeline_name_env_vars(Path.cwd(), PIPELINE_PATH) - assert "some-random-path-for-testing" == ENV["pipeline_name"] - assert "some" == ENV["pipeline_name_0"] - assert "random" == ENV["pipeline_name_1"] - assert "path" == ENV["pipeline_name_2"] - assert "for" == ENV["pipeline_name_3"] - assert "testing" == ENV["pipeline_name_4"] + assert ENV["pipeline_name"] == "some-random-path-for-testing" + assert ENV["pipeline_name_0"] == "some" + assert ENV["pipeline_name_1"] == "random" + assert ENV["pipeline_name_2"] == "path" + assert ENV["pipeline_name_3"] == "for" + assert ENV["pipeline_name_4"] == "testing" def test_should_not_set_pipeline_name_with_the_same_base_dir(): - with pytest.raises(ValueError): + with pytest.raises( + ValueError, match="The pipeline-base-dir should not equal the pipeline-path" + ): Pipeline.set_pipeline_name_env_vars(PIPELINE_PATH, PIPELINE_PATH) diff --git a/tests/component_handlers/helm_wrapper/test_dry_run_handler.py b/tests/component_handlers/helm_wrapper/test_dry_run_handler.py index 20c02f50d..bad4f2aa8 100644 --- a/tests/component_handlers/helm_wrapper/test_dry_run_handler.py +++ b/tests/component_handlers/helm_wrapper/test_dry_run_handler.py @@ -12,13 +12,13 @@ class TestDryRunHandler: - @pytest.fixture + @pytest.fixture() def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.component_handlers.helm_wrapper.dry_run_handler.Helm" ).return_value - @pytest.fixture + @pytest.fixture() def helm_diff_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.component_handlers.helm_wrapper.dry_run_handler.HelmDiff" diff --git a/tests/component_handlers/helm_wrapper/test_helm_wrapper.py b/tests/component_handlers/helm_wrapper/test_helm_wrapper.py index de23dca8e..ce6fae709 100644 --- a/tests/component_handlers/helm_wrapper/test_helm_wrapper.py +++ b/tests/component_handlers/helm_wrapper/test_helm_wrapper.py @@ -29,15 +29,15 @@ def temp_file_mock(self, mocker: MockerFixture) -> MagicMock: temp_file_mock.return_value.__enter__.return_value.name = "values.yaml" return temp_file_mock - @pytest.fixture + @pytest.fixture() def run_command(self, mocker: MockerFixture) -> MagicMock: return mocker.patch.object(Helm, "_Helm__execute") - @pytest.fixture + @pytest.fixture() def log_warning_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.component_handlers.helm_wrapper.helm.log.warning") - @pytest.fixture + @pytest.fixture() def mock_get_version(self, mocker: MockerFixture) -> MagicMock: mock_get_version = mocker.patch.object(Helm, "get_version") mock_get_version.return_value = Version(major=3, minor=12, patch=0) @@ -337,8 +337,7 @@ def test_raise_parse_error_when_helm_content_is_invalid(self): """ ) with pytest.raises(ParseError, match="Not a valid Helm template source"): - helm_template = list(Helm.load_manifest(stdout)) - assert len(helm_template) == 0 + list(Helm.load_manifest(stdout)) def test_load_manifest(self): stdout = dedent( @@ -498,7 +497,7 @@ def test_should_call_run_command_method_when_helm_template_without_optional_args ) @pytest.mark.parametrize( - "raw_version, expected_version", + ("raw_version", "expected_version"), [ ("v3.12.0+gc9f554d", Version(3, 12, 0)), ("v3.12.0", Version(3, 12, 0)), diff --git a/tests/component_handlers/kafka_connect/test_connect_handler.py b/tests/component_handlers/kafka_connect/test_connect_handler.py index a5a1f3246..db64690e9 100644 --- a/tests/component_handlers/kafka_connect/test_connect_handler.py +++ b/tests/component_handlers/kafka_connect/test_connect_handler.py @@ -22,25 +22,25 @@ class TestConnectorHandler: - @pytest.fixture + @pytest.fixture() def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.info" ) - @pytest.fixture + @pytest.fixture() def log_warning_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.warning" ) - @pytest.fixture + @pytest.fixture() def log_error_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.error" ) - @pytest.fixture + @pytest.fixture() def renderer_diff_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.component_handlers.kafka_connect.kafka_connect_handler.render_diff" @@ -53,7 +53,7 @@ def connector_handler(connect_wrapper: MagicMock) -> KafkaConnectHandler: timeout=0, ) - @pytest.fixture + @pytest.fixture() def connector_config(self) -> KafkaConnectorConfig: return KafkaConnectorConfig( **{ diff --git a/tests/component_handlers/kafka_connect/test_connect_wrapper.py b/tests/component_handlers/kafka_connect/test_connect_wrapper.py index 3db9c090f..8e60d92a7 100644 --- a/tests/component_handlers/kafka_connect/test_connect_wrapper.py +++ b/tests/component_handlers/kafka_connect/test_connect_wrapper.py @@ -26,7 +26,7 @@ class TestConnectorApiWrapper: @pytest.fixture(autouse=True) - def setup(self): + def _setup(self): config = PipelineConfig( defaults_path=DEFAULTS_PATH, environment="development", @@ -34,7 +34,7 @@ def setup(self): ) self.connect_wrapper = ConnectWrapper(host=config.kafka_connect_host) - @pytest.fixture + @pytest.fixture() def connector_config(self) -> KafkaConnectorConfig: return KafkaConnectorConfig( **{ @@ -495,9 +495,9 @@ def test_should_create_correct_validate_connector_config_and_name_gets_added( ) def test_should_parse_validate_connector_config(self, httpx_mock: HTTPXMock): - with open( + with Path( DEFAULTS_PATH / "connect_validation_response.json", - ) as f: + ).open() as f: actual_response = json.load(f) httpx_mock.add_response( method="PUT", diff --git a/tests/component_handlers/schema_handler/test_schema_handler.py b/tests/component_handlers/schema_handler/test_schema_handler.py index ccea021c6..faf54ba09 100644 --- a/tests/component_handlers/schema_handler/test_schema_handler.py +++ b/tests/component_handlers/schema_handler/test_schema_handler.py @@ -116,45 +116,48 @@ def test_should_raise_value_error_if_schema_provider_class_not_found(): url="http://mock:8081", components_module=NON_EXISTING_PROVIDER_MODULE ) - with pytest.raises(ValueError) as value_error: + with pytest.raises( + ValueError, + match="No schema provider found in components module pydantic.main. " + "Please implement the abstract method in " + f"{SchemaProvider.__module__}.{SchemaProvider.__name__}.", + ): schema_handler.schema_provider.provide_schema( "com.bakdata.kpops.test.SchemaHandlerTest", {} ) - assert ( - str(value_error.value) - == "No schema provider found in components module pydantic.main. " - "Please implement the abstract method in " - f"{SchemaProvider.__module__}.{SchemaProvider.__name__}." - ) - -def test_should_raise_value_error_when_schema_provider_is_called_and_components_module_is_empty(): +@pytest.mark.parametrize( + ("components_module"), + [ + pytest.param( + None, + id="components_module = None", + ), + pytest.param( + "", + id="components_module = ''", + ), + ], +) +def test_should_raise_value_error_when_schema_provider_is_called_and_components_module_is_empty( + components_module: str, +): config_enable = PipelineConfig( defaults_path=Path("fake"), environment="development", schema_registry_url="http://localhost:8081", ) - - with pytest.raises(ValueError): - schema_handler = SchemaHandler.load_schema_handler(None, config_enable) - assert schema_handler is not None - schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", {} - ) - - with pytest.raises(ValueError) as value_error: - schema_handler = SchemaHandler.load_schema_handler("", config_enable) - assert schema_handler is not None + schema_handler = SchemaHandler.load_schema_handler(components_module, config_enable) + assert schema_handler is not None + with pytest.raises( + ValueError, + match="The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your SchemaProvider implementation exists.", + ): schema_handler.schema_provider.provide_schema( "com.bakdata.kpops.test.SchemaHandlerTest", {} ) - assert ( - str(value_error.value) - == "The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your SchemaProvider implementation exists." - ) - def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( to_section: ToSection, log_info_mock: MagicMock, schema_registry_mock: MagicMock @@ -210,10 +213,9 @@ def test_should_raise_exception_when_submit_schema_that_exists_and_not_compatibl schema_registry_mock.check_version.return_value = None schema_registry_mock.test_compatibility.return_value = False - with pytest.raises(Exception) as exception: + with pytest.raises(Exception, match="Schema is not compatible for") as exception: schema_handler.submit_schemas(to_section, True) - assert "Schema is not compatible for" in str(exception.value) EXPECTED_SCHEMA = { "type": "record", "name": "KPOps.Employee", diff --git a/tests/component_handlers/topic/test_proxy_wrapper.py b/tests/component_handlers/topic/test_proxy_wrapper.py index 7b587ecb3..e26fb0e5a 100644 --- a/tests/component_handlers/topic/test_proxy_wrapper.py +++ b/tests/component_handlers/topic/test_proxy_wrapper.py @@ -30,15 +30,15 @@ def log_debug_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.component_handlers.topic.proxy_wrapper.log.debug") @pytest.fixture(autouse=True) - def setup(self, httpx_mock: HTTPXMock): + def _setup(self, httpx_mock: HTTPXMock): config = PipelineConfig( defaults_path=DEFAULTS_PATH, environment="development", kafka_rest_host=HOST ) self.proxy_wrapper = ProxyWrapper(pipeline_config=config) - with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses" / "cluster-info.json" - ) as f: + with Path( + DEFAULTS_PATH / "kafka_rest_proxy_responses" / "cluster-info.json", + ).open() as f: cluster_response = json.load(f) httpx_mock.add_response( @@ -53,12 +53,11 @@ def setup(self, httpx_mock: HTTPXMock): def test_should_raise_exception_when_host_is_not_set(self): config = PipelineConfig(defaults_path=DEFAULTS_PATH, environment="development") config.kafka_rest_host = None - with pytest.raises(ValueError) as exception: + with pytest.raises( + ValueError, + match="The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST.", + ): ProxyWrapper(pipeline_config=config) - assert ( - str(exception.value) - == "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." - ) @patch("httpx.post") def test_should_create_topic_with_all_topic_configuration( diff --git a/tests/component_handlers/topic/test_topic_handler.py b/tests/component_handlers/topic/test_topic_handler.py index c53a7a60d..6b1b017fc 100644 --- a/tests/component_handlers/topic/test_topic_handler.py +++ b/tests/component_handlers/topic/test_topic_handler.py @@ -51,19 +51,19 @@ def log_error_mock(self, mocker: MockerFixture) -> MagicMock: @pytest.fixture(autouse=True) def get_topic_response_mock(self) -> MagicMock: - with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/get_topic_response.json" - ) as f: + with Path( + DEFAULTS_PATH / "kafka_rest_proxy_responses/get_topic_response.json", + ).open() as f: response = json.load(f) - with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/broker_response.json" - ) as f: + with Path( + DEFAULTS_PATH / "kafka_rest_proxy_responses/broker_response.json", + ).open() as f: broker_response = json.load(f) - with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/topic_config_response.json" - ) as f: + with Path( + DEFAULTS_PATH / "kafka_rest_proxy_responses/topic_config_response.json", + ).open() as f: response_topic_config = json.load(f) wrapper = MagicMock() @@ -76,14 +76,15 @@ def get_topic_response_mock(self) -> MagicMock: @pytest.fixture(autouse=True) def get_default_topic_response_mock(self) -> MagicMock: - with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/get_default_topic_response.json" - ) as f: + with Path( + DEFAULTS_PATH + / "kafka_rest_proxy_responses/get_default_topic_response.json", + ).open() as f: response = json.load(f) - with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/broker_response.json" - ) as f: + with Path( + DEFAULTS_PATH / "kafka_rest_proxy_responses/broker_response.json", + ).open() as f: broker_response = json.load(f) wrapper = MagicMock() @@ -369,7 +370,7 @@ def test_should_exit_if_dry_run_and_topic_exists_different_partition_count( match="Topic Creation: partition count of topic topic-X changed! Partitions count of topic topic-X is 10. The given partitions count 200.", ): topic_handler.create_topics(to_section=to_section, dry_run=True) - wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff + wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff def test_should_exit_if_dry_run_and_topic_exists_different_replication_factor( self, get_topic_response_mock: MagicMock @@ -391,7 +392,7 @@ def test_should_exit_if_dry_run_and_topic_exists_different_replication_factor( match="Topic Creation: replication factor of topic topic-X changed! Replication factor of topic topic-X is 3. The given replication count 300.", ): topic_handler.create_topics(to_section=to_section, dry_run=True) - wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff + wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff def test_should_log_correct_message_when_delete_existing_topic_dry_run( self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index 7b25e5f74..d066d431b 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -37,7 +37,7 @@ class EnvVarTest(BaseDefaultsComponent): name: str | None = None -@pytest.fixture +@pytest.fixture() def config() -> PipelineConfig: return PipelineConfig( defaults_path=DEFAULTS_PATH, @@ -45,7 +45,7 @@ def config() -> PipelineConfig: ) -@pytest.fixture +@pytest.fixture() def handlers() -> ComponentHandlers: return ComponentHandlers( schema_handler=MagicMock(), diff --git a/tests/components/test_kafka_app.py b/tests/components/test_kafka_app.py index c6527c00c..8fd0d98ec 100644 --- a/tests/components/test_kafka_app.py +++ b/tests/components/test_kafka_app.py @@ -17,7 +17,7 @@ class TestKafkaApp: - @pytest.fixture + @pytest.fixture() def config(self) -> PipelineConfig: return PipelineConfig( defaults_path=DEFAULTS_PATH, @@ -25,7 +25,7 @@ def config(self) -> PipelineConfig: helm_diff_config=HelmDiffConfig(), ) - @pytest.fixture + @pytest.fixture() def handlers(self) -> ComponentHandlers: return ComponentHandlers( schema_handler=MagicMock(), diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index 912f449fb..2adf867da 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -18,7 +18,7 @@ class TestKafkaConnector: - @pytest.fixture + @pytest.fixture() def config(self) -> PipelineConfig: return PipelineConfig( defaults_path=DEFAULTS_PATH, @@ -31,7 +31,7 @@ def config(self) -> PipelineConfig: helm_diff_config=HelmDiffConfig(), ) - @pytest.fixture + @pytest.fixture() def handlers(self) -> ComponentHandlers: return ComponentHandlers( schema_handler=MagicMock(), @@ -45,13 +45,13 @@ def helm_mock(self, mocker: MockerFixture) -> MagicMock: "kpops.components.base_components.kafka_connector.Helm" ).return_value - @pytest.fixture + @pytest.fixture() def dry_run_handler(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.components.base_components.kafka_connector.DryRunHandler" ).return_value - @pytest.fixture + @pytest.fixture() def connector_config(self) -> KafkaConnectorConfig: return KafkaConnectorConfig( **{ @@ -79,7 +79,7 @@ def test_connector_config_name_override( name=CONNECTOR_NAME, config=config, handlers=handlers, - app={"connector.class": CONNECTOR_CLASS}, # type: ignore + app={"connector.class": CONNECTOR_CLASS}, # type: ignore[reportGeneralTypeIssues] namespace="test-namespace", ) assert connector.app.name == CONNECTOR_FULL_NAME @@ -91,7 +91,7 @@ def test_connector_config_name_override( name=CONNECTOR_NAME, config=config, handlers=handlers, - app={"connector.class": CONNECTOR_CLASS, "name": "different-name"}, # type: ignore + app={"connector.class": CONNECTOR_CLASS, "name": "different-name"}, # type: ignore[reportGeneralTypeIssues] namespace="test-namespace", ) @@ -102,6 +102,6 @@ def test_connector_config_name_override( name=CONNECTOR_NAME, config=config, handlers=handlers, - app={"connector.class": CONNECTOR_CLASS, "name": ""}, # type: ignore + app={"connector.class": CONNECTOR_CLASS, "name": ""}, # type: ignore[reportGeneralTypeIssues] namespace="test-namespace", ) diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 91760e90c..e8ed7aa22 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -35,11 +35,11 @@ class TestKafkaSinkConnector(TestKafkaConnector): - @pytest.fixture + @pytest.fixture() def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.components.base_components.kafka_connector.log.info") - @pytest.fixture + @pytest.fixture() def connector( self, config: PipelineConfig, diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index db9a2dd77..169111ed3 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -32,7 +32,7 @@ class TestKafkaSourceConnector(TestKafkaConnector): - @pytest.fixture + @pytest.fixture() def connector( self, config: PipelineConfig, diff --git a/tests/components/test_kubernetes_app.py b/tests/components/test_kubernetes_app.py index 46eb9795d..6583ac4bf 100644 --- a/tests/components/test_kubernetes_app.py +++ b/tests/components/test_kubernetes_app.py @@ -27,7 +27,7 @@ class KubernetesTestValue(KubernetesAppConfig): class TestKubernetesApp: - @pytest.fixture + @pytest.fixture() def config(self) -> PipelineConfig: return PipelineConfig( defaults_path=DEFAULTS_PATH, @@ -35,7 +35,7 @@ def config(self) -> PipelineConfig: helm_diff_config=HelmDiffConfig(), ) - @pytest.fixture + @pytest.fixture() def handlers(self) -> ComponentHandlers: return ComponentHandlers( schema_handler=MagicMock(), @@ -43,25 +43,25 @@ def handlers(self) -> ComponentHandlers: topic_handler=MagicMock(), ) - @pytest.fixture + @pytest.fixture() def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.components.base_components.kubernetes_app.Helm" ).return_value - @pytest.fixture + @pytest.fixture() def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch("kpops.components.base_components.kubernetes_app.log.info") - @pytest.fixture + @pytest.fixture() def app_value(self) -> KubernetesTestValue: return KubernetesTestValue(**{"name_override": "test-value"}) - @pytest.fixture + @pytest.fixture() def repo_config(self) -> HelmRepoConfig: return HelmRepoConfig(repository_name="test", url="https://bakdata.com") - @pytest.fixture + @pytest.fixture() def kubernetes_app( self, config: PipelineConfig, @@ -195,8 +195,8 @@ def test_should_raise_not_implemented_error_when_helm_chart_is_not_set( kubernetes_app.deploy(True) helm_mock.add_repo.assert_called() assert ( - "Please implement the helm_chart property of the kpops.components.base_components.kubernetes_app module." - == str(error.value) + str(error.value) + == "Please implement the helm_chart property of the kpops.components.base_components.kubernetes_app module." ) def test_should_call_helm_uninstall_when_destroying_kubernetes_app( diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 56d52a68b..84f9f86c6 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -21,7 +21,7 @@ class TestProducerApp: PRODUCER_APP_NAME = "test-producer-app-with-long-name-0123456789abcdefghijklmnop" PRODUCER_APP_CLEAN_NAME = "test-producer-app-with-long-n-clean" - @pytest.fixture + @pytest.fixture() def handlers(self) -> ComponentHandlers: return ComponentHandlers( schema_handler=MagicMock(), @@ -29,7 +29,7 @@ def handlers(self) -> ComponentHandlers: topic_handler=MagicMock(), ) - @pytest.fixture + @pytest.fixture() def config(self) -> PipelineConfig: return PipelineConfig( defaults_path=DEFAULTS_PATH, @@ -40,7 +40,7 @@ def config(self) -> PipelineConfig: ), ) - @pytest.fixture + @pytest.fixture() def producer_app( self, config: PipelineConfig, handlers: ComponentHandlers ) -> ProducerApp: diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index dce2c7e96..0d9135b54 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -25,7 +25,7 @@ class TestStreamsApp: STREAMS_APP_NAME = "test-streams-app-with-long-name-0123456789abcdefghijklmnop" STREAMS_APP_CLEAN_NAME = "test-streams-app-with-long-na-clean" - @pytest.fixture + @pytest.fixture() def handlers(self) -> ComponentHandlers: return ComponentHandlers( schema_handler=MagicMock(), @@ -33,7 +33,7 @@ def handlers(self) -> ComponentHandlers: topic_handler=MagicMock(), ) - @pytest.fixture + @pytest.fixture() def config(self) -> PipelineConfig: return PipelineConfig( defaults_path=DEFAULTS_PATH, @@ -45,7 +45,7 @@ def config(self) -> PipelineConfig: helm_diff_config=HelmDiffConfig(), ) - @pytest.fixture + @pytest.fixture() def streams_app( self, config: PipelineConfig, handlers: ComponentHandlers ) -> StreamsApp: @@ -145,7 +145,9 @@ def test_no_empty_input_topic( def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandlers): # An exception should be raised when both role and type are defined and type is input - with pytest.raises(ValueError): + with pytest.raises( + ValueError, match="Define role only if `type` is `pattern` or `None`" + ): StreamsApp( name=self.STREAMS_APP_NAME, config=config, @@ -167,7 +169,9 @@ def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandle ) # An exception should be raised when both role and type are defined and type is error - with pytest.raises(ValueError): + with pytest.raises( + ValueError, match="Define `role` only if `type` is undefined" + ): StreamsApp( name=self.STREAMS_APP_NAME, config=config, diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index cb58d19f0..86e2c8b8e 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -44,7 +44,8 @@ def inflate(self) -> list[PipelineComponent]: config=self.config, handlers=self.handlers, namespace="example-namespace", - app={ # type: ignore # FIXME + # FIXME + app={ # type: ignore[reportGeneralTypeIssues] "topics": topic_name, "transforms.changeTopic.replacement": f"{topic_name}-index-v1", }, @@ -64,7 +65,7 @@ def inflate(self) -> list[PipelineComponent]: name=f"{self.name}-inflated-streams-app", config=self.config, handlers=self.handlers, - to=ToSection( # type: ignore + to=ToSection( # type: ignore[reportGeneralTypeIssues] topics={ TopicName( f"{self.full_name}-" + "${component_name}" diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index af9cde479..433960e74 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -3,7 +3,6 @@ import pytest import yaml -from pytest import MonkeyPatch from snapshottest.module import SnapshotTest from typer.testing import CliRunner @@ -461,7 +460,7 @@ def test_default_config(self, snapshot: SnapshotTest): def test_env_vars_precedence_over_config( self, - monkeypatch: MonkeyPatch, + monkeypatch: pytest.MonkeyPatch, snapshot: SnapshotTest, ): monkeypatch.setenv(name="KPOPS_KAFKA_BROKERS", value="env_broker") diff --git a/tests/pipeline/test_template.py b/tests/pipeline/test_template.py index cd4436b7a..a43fbec5b 100644 --- a/tests/pipeline/test_template.py +++ b/tests/pipeline/test_template.py @@ -15,7 +15,7 @@ class TestTemplate: - @pytest.fixture + @pytest.fixture() def run_command(self, mocker: MockerFixture) -> MagicMock: return mocker.patch.object(Helm, "_Helm__execute") diff --git a/tests/utils/test_environment.py b/tests/utils/test_environment.py index 09bbb75de..8fc02c826 100644 --- a/tests/utils/test_environment.py +++ b/tests/utils/test_environment.py @@ -5,12 +5,12 @@ from kpops.utils.environment import Environment -@pytest.fixture +@pytest.fixture() def fake_environment_windows(): return {"MY": "fake", "ENVIRONMENT": "here"} -@pytest.fixture +@pytest.fixture() def fake_environment_linux(): return {"my": "fake", "environment": "here"} From 10f02a907376a42d9e2c2f3c5cca33011bb30e0a Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 11 Oct 2023 10:59:04 +0200 Subject: [PATCH 2/4] Print details on connector name mismatch error (#369) --- .gitignore | 1 + kpops/components/base_components/kafka_connector.py | 2 +- tests/components/test_kafka_connector.py | 11 +++++++++-- .../pipeline-with-env-defaults/defaults.yaml | 4 ++-- .../defaults_development.yaml | 2 +- 5 files changed, 14 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index db58e072b..7724b5bab 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ pipelines/ defaults/ site/ +scratch* diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 96ee68041..f77588401 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -87,7 +87,7 @@ def connector_config_should_have_component_name( component_name = values["prefix"] + values["name"] connector_name: str | None = app.get("name") if connector_name is not None and connector_name != component_name: - msg = "Connector name should be the same as component name" + msg = f"Connector name '{connector_name}' should be the same as component name '{component_name}'" raise ValueError(msg) app["name"] = component_name return app diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index 2adf867da..98771d4af 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -1,3 +1,4 @@ +import re from pathlib import Path from unittest.mock import MagicMock @@ -85,7 +86,10 @@ def test_connector_config_name_override( assert connector.app.name == CONNECTOR_FULL_NAME with pytest.raises( - ValueError, match="Connector name should be the same as component name" + ValueError, + match=re.escape( + f"Connector name 'different-name' should be the same as component name '{CONNECTOR_FULL_NAME}'" + ), ): KafkaConnector( name=CONNECTOR_NAME, @@ -96,7 +100,10 @@ def test_connector_config_name_override( ) with pytest.raises( - ValueError, match="Connector name should be the same as component name" + ValueError, + match=re.escape( + f"Connector name '' should be the same as component name '{CONNECTOR_FULL_NAME}'" + ), ): KafkaConnector( name=CONNECTOR_NAME, diff --git a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml index 2564e0012..639c176e5 100644 --- a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml +++ b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml @@ -1,5 +1,5 @@ kubernetes-app: - name: "${component_type}" + name: ${component_type} namespace: example-namespace kafka-app: app: @@ -24,7 +24,7 @@ streams-app: # inherits from kafka-app cleanup.policy: compact,delete kafka-connector: - name: "sink-connector" + name: sink-connector app: batch.size: "2000" behavior.on.malformed.documents: "warn" diff --git a/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml b/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml index 035691c2e..c7b863a92 100644 --- a/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml +++ b/tests/pipeline/resources/pipeline-with-env-defaults/defaults_development.yaml @@ -1,3 +1,3 @@ kubernetes-app: - name: "${component_type}-development" + name: ${component_type}-development namespace: development-namespace From e5905eb53845353c981ce412bf1c0324aeb0373f Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Thu, 12 Oct 2023 14:56:05 +0200 Subject: [PATCH 3/4] Enable transparent OS environment lookups from internal environment (#368) --- kpops/utils/environment.py | 54 +++--- .../pipeline-with-env-defaults/defaults.yaml | 1 - tests/utils/test_environment.py | 164 +++++++----------- 3 files changed, 94 insertions(+), 125 deletions(-) diff --git a/kpops/utils/environment.py b/kpops/utils/environment.py index 0ed7ae920..fdabf1703 100644 --- a/kpops/utils/environment.py +++ b/kpops/utils/environment.py @@ -1,36 +1,42 @@ import os -import platform from collections import UserDict -from collections.abc import Callable +from collections.abc import ItemsView, KeysView, MutableMapping, ValuesView -class Environment(UserDict): - def __init__(self, mapping=None, /, **kwargs) -> None: - transformation = Environment.__get_transformation() - if mapping is not None: - mapping = {transformation(key): value for key, value in mapping.items()} - else: +class Environment(UserDict[str, str]): + """Internal environment wrapping OS environment.""" + + def __init__( + self, mapping: MutableMapping[str, str] | None = None, /, **kwargs: str + ) -> None: + self._global = os.environ + if mapping is None: mapping = {} if kwargs: - mapping.update( - {transformation(key): value for key, value in kwargs.items()} - ) + mapping.update(**kwargs) super().__init__(mapping) - @staticmethod - def __key_camel_case_transform(key: str) -> str: - return key.lower() + def __getitem__(self, key: str) -> str: + try: + return self.data[key] + except KeyError: + return self._global[key] + + def __contains__(self, key: object) -> bool: + return super().__contains__(key) or self._global.__contains__(key) + + @property + def _dict(self) -> dict[str, str]: + return {**self._global, **self.data} + + def keys(self) -> KeysView[str]: + return KeysView(self._dict) - @staticmethod - def __key_identity_transform(key: str) -> str: - return key + def values(self) -> ValuesView[str]: + return ValuesView(self._dict) - @staticmethod - def __get_transformation() -> Callable[[str], str]: - if platform.system() == "Windows": - return Environment.__key_camel_case_transform - else: - return Environment.__key_identity_transform + def items(self) -> ItemsView[str, str]: + return ItemsView(self._dict) -ENV = Environment(os.environ) +ENV = Environment() diff --git a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml index 639c176e5..8faa86ad5 100644 --- a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml +++ b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml @@ -34,6 +34,5 @@ kafka-connector: key.ignore: "false" linger.ms: "5000" max.buffered.records: "20000" - name: "sink-connector" read.timeout.ms: "120000" tasks.max: "1" diff --git a/tests/utils/test_environment.py b/tests/utils/test_environment.py index 8fc02c826..d1dcb8f5e 100644 --- a/tests/utils/test_environment.py +++ b/tests/utils/test_environment.py @@ -1,117 +1,81 @@ -from unittest.mock import patch +import os +from collections.abc import ItemsView, KeysView, ValuesView +from unittest.mock import ANY import pytest from kpops.utils.environment import Environment -@pytest.fixture() -def fake_environment_windows(): - return {"MY": "fake", "ENVIRONMENT": "here"} +@pytest.fixture(autouse=True) +def environment(monkeypatch: pytest.MonkeyPatch) -> Environment: + for key in os.environ: + monkeypatch.delenv(key) + monkeypatch.setenv("MY", "fake") + monkeypatch.setenv("ENVIRONMENT", "here") + return Environment() -@pytest.fixture() -def fake_environment_linux(): - return {"my": "fake", "environment": "here"} +def test_get_item(environment: Environment): + assert environment["MY"] == "fake" + assert environment["ENVIRONMENT"] == "here" -@patch("platform.system") -def test_normal_behaviour_get_item(system, fake_environment_linux): - system.return_value = "Linux" - environment = Environment(fake_environment_linux) - - assert environment["my"] == "fake" - assert environment["environment"] == "here" - - -@patch("platform.system") -def test_normal_behaviour_get_item_as_kwargs(system, fake_environment_linux): - system.return_value = "Linux" - environment = Environment(**fake_environment_linux) - - assert environment["my"] == "fake" - assert environment["environment"] == "here" - - -@patch("platform.system") -def test_normal_behaviour_keys_transformation(system, fake_environment_linux): - system.return_value = "Linux" - environment = Environment(fake_environment_linux) - keys = set(environment.keys()) - - assert "my" in keys - assert "environment" in keys - - -@patch("platform.system") -def test_normal_behaviour_set_key(system, fake_environment_linux): - system.return_value = "Linux" - environment = Environment(fake_environment_linux) +def test_set_item(environment: Environment): environment["extra"] = "key" keys = set(environment.keys()) - assert "my" in keys - assert "environment" in keys + assert "MY" in keys + assert "ENVIRONMENT" in keys assert "extra" in keys assert environment["extra"] == "key" -@patch("platform.system") -def test_windows_behaviour_set_key(system, fake_environment_windows): - system.return_value = "Windows" - environment = Environment(fake_environment_windows) - environment["extra"] = "key" - - keys = set(environment.keys()) - assert "my" in keys - assert "environment" in keys - assert "extra" in keys - assert environment["extra"] == "key" - - -@patch("platform.system") -def test_normal_behaviour_keys_transformation_kwargs(system, fake_environment_linux): - system.return_value = "Linux" - environment = Environment(**fake_environment_linux) - - keys = set(environment.keys()) - assert "my" in keys - assert "environment" in keys - - -@patch("platform.system") -def test_windows_behaviour_keys_transformation(system, fake_environment_windows): - system.return_value = "Windows" - environment = Environment(fake_environment_windows) - - keys = set(environment.keys()) - assert "my" in keys - assert "environment" in keys - - -@patch("platform.system") -def test_windows_behaviour_keys_transformation_as_kwargs( - system, fake_environment_windows -): - system.return_value = "Windows" - environment = Environment(**fake_environment_windows) - keys = set(environment.keys()) - assert "my" in keys - assert "environment" in keys - - -@patch("platform.system") -def test_windows_behaviour_get_item(system, fake_environment_windows): - system.return_value = "Windows" - - environment = Environment(fake_environment_windows) - assert environment["my"] == "fake" - assert environment["environment"] == "here" - - -@patch("platform.system") -def test_windows_behaviour_get_item_as_kwargs(system, fake_environment_windows): - system.return_value = "Windows" - environment = Environment(**fake_environment_windows) - assert environment["my"] == "fake" - assert environment["environment"] == "here" +def test_update_os_environ(environment: Environment): + with pytest.raises(KeyError): + environment["TEST"] + os.environ["TEST"] = "test" + assert "TEST" in environment + assert environment["TEST"] == "test" + keys = environment.keys() + assert isinstance(keys, KeysView) + assert "TEST" in keys + values = environment.values() + assert isinstance(values, ValuesView) + assert "test" in values + items = environment.items() + assert isinstance(items, ItemsView) + d = dict(items) + assert d["TEST"] == "test" + + +def test_mapping(): + environment = Environment({"kwarg1": "value1", "kwarg2": "value2"}) + assert environment["MY"] == "fake" + assert environment["ENVIRONMENT"] == "here" + assert environment["kwarg1"] == "value1" + assert environment["kwarg2"] == "value2" + + +def test_kwargs(): + environment = Environment(kwarg1="value1", kwarg2="value2") + assert environment["MY"] == "fake" + assert environment["ENVIRONMENT"] == "here" + assert environment["kwarg1"] == "value1" + assert environment["kwarg2"] == "value2" + + +def test_dict(environment: Environment): + assert environment._dict == { + "MY": "fake", + "ENVIRONMENT": "here", + "PYTEST_CURRENT_TEST": ANY, + } + + +def test_dict_unpacking(environment: Environment): + assert {**environment} == { + "MY": "fake", + "ENVIRONMENT": "here", + "PYTEST_CURRENT_TEST": ANY, + } From 6ca008150ff468fb2f7e6274919c63e1a8b4a1a6 Mon Sep 17 00:00:00 2001 From: bakdata-bots Date: Thu, 12 Oct 2023 12:57:43 +0000 Subject: [PATCH 4/4] =?UTF-8?q?Bump=20version=202.0.9=20=E2=86=92=202.0.10?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 17 +++++++++++++++++ kpops/__init__.py | 2 +- pyproject.toml | 2 +- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c16a9033b..402a6fe7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,21 @@ # Changelog +## [2.0.10](https://github.com/bakdata/kpops/releases/tag/2.0.10) - Release Date: [2023-10-12] + +### 🌀 Miscellaneous + +- Fix environment variables documentation generation - [#362](https://github.com/bakdata/kpops/pull/362) + +- Introduce ruff - [#363](https://github.com/bakdata/kpops/pull/363) + +- Print details on connector name mismatch error - [#369](https://github.com/bakdata/kpops/pull/369) + +- Enable transparent OS environment lookups from internal environment - [#368](https://github.com/bakdata/kpops/pull/368) + + + + + + ## [2.0.9](https://github.com/bakdata/kpops/releases/tag/2.0.9) - Release Date: [2023-09-19] ### 🐛 Fixes diff --git a/kpops/__init__.py b/kpops/__init__.py index 20b037690..70fbe94a4 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,4 +1,4 @@ -__version__ = "2.0.9" +__version__ = "2.0.10" # export public API functions from kpops.cli.main import clean, deploy, destroy, generate, reset diff --git a/pyproject.toml b/pyproject.toml index 7e0427eda..64b573128 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "kpops" -version = "2.0.9" +version = "2.0.10" description = "KPOps is a tool to deploy Kafka pipelines to Kubernetes" authors = ["bakdata "] license = "MIT"