Skip to content

Commit

Permalink
Fix early exit upon Helm exit code 1 (#376)
Browse files Browse the repository at this point in the history
fixes #373 

### Fix

> I am getting this error when I try to use dry-run:
subprocess.CalledProcessError: Command '['helm', 'get', 'manifest',
'account-producer', '--namespace', 'my-namespace']' returned non-zero
exit status 1. why? does he launch it first in dry-run ? when I do that
in my commandline I get
To learn more, consult
https://cloud.google.com/blog/products/containers-kubernetes/kubectl-auth-changes-in-gke
Error: release: not found

The [problem came
from](2.0.9...2.0.10#diff-1ca7465daf9238dfe221304df636d96704ebf4935288b854a5c4c6b3ea9e3162R215)
setting `subproces.run(..., check = True)` without handling the raised
exception.


### Preventive measures

I took some imports out of `TYPE_CHECKING` blocks to avoid problems with
`pydantic` as it uses type hints at runtime


### Ruff

Updated to the latest (`0.1.1`) version. Now all fixes deemed unsafe
will not be automatically carried out even if it is possible to do so.
The user can choose to enable the unsafe autofixes with the
`--unsafe-fixes` flag


### Example

While testing I noticed that we could use the namespace env var in some
places in the example, so I did it and had to adjust the tests.
  • Loading branch information
sujuka99 authored Oct 23, 2023
1 parent 6ca0081 commit 352cf9c
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 48 deletions.
4 changes: 2 additions & 2 deletions examples/bakdata/atm-fraud-detection/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ topic_name_config:
default_error_topic_name: "${pipeline_name}-${component_name}-dead-letter-topic"
default_output_topic_name: "${pipeline_name}-${component_name}-topic"

brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092"
brokers: "http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092"

schema_registry_url: "http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081"
schema_registry_url: "http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081"

kafka_rest_host: "http://localhost:8082"

Expand Down
4 changes: 2 additions & 2 deletions examples/bakdata/atm-fraud-detection/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@
connector.class: io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max: 1
topics: ${pipeline_name}-account-linker-topic
connection.url: jdbc:postgresql://postgresql-dev.kpops.svc.cluster.local:5432/app_db
connection.url: jdbc:postgresql://postgresql-dev.${NAMESPACE}.svc.cluster.local:5432/app_db
connection.user: app1
connection.password: AppPassword
connection.ds.pool.size: 5
insert.mode: insert
insert.mode.databaselevel: true
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081
value.converter.schema.registry.url: http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081
key.converter: org.apache.kafka.connect.storage.StringConverter
transforms: flatten
transforms.flatten.type: org.apache.kafka.connect.transforms.Flatten$Value
Expand Down
9 changes: 3 additions & 6 deletions kpops/cli/pipeline_config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from __future__ import annotations

from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import Any

from pydantic import BaseConfig, BaseSettings, Field
from pydantic.env_settings import SettingsSourceCallable

from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig
from kpops.utils.yaml_loading import load_yaml_file

if TYPE_CHECKING:
from collections.abc import Callable

from pydantic.env_settings import SettingsSourceCallable

ENV_PREFIX = "KPOPS_"


Expand Down
2 changes: 1 addition & 1 deletion kpops/component_handlers/helm_wrapper/helm.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def __execute(self, command: list[str]) -> str:
log.debug(f"Executing {' '.join(command)}")
process = subprocess.run(
command,
check=True,
check=False,
capture_output=True,
text=True,
)
Expand Down
6 changes: 2 additions & 4 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from abc import ABC
from functools import cached_property
from typing import TYPE_CHECKING, Any, NoReturn
from typing import Any, NoReturn

from pydantic import Field, validator
from typing_extensions import override
Expand All @@ -25,13 +25,11 @@
KafkaConnectResetterValues,
)
from kpops.components.base_components.base_defaults_component import deduplicate
from kpops.components.base_components.models.from_section import FromTopic
from kpops.components.base_components.pipeline_component import PipelineComponent
from kpops.utils.colorify import magentaify
from kpops.utils.docstring import describe_attr

if TYPE_CHECKING:
from kpops.components.base_components.models.from_section import FromTopic

log = logging.getLogger("KafkaConnector")


Expand Down
38 changes: 19 additions & 19 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pytest-mock = "^3.10.0"
pytest-timeout = "^2.1.0"
snapshottest = "^0.6.0"
pre-commit = "^2.19.0"
ruff = "^0.0.292"
ruff = "^0.1.1"
black = "^23.7.0"
typer-cli = "^0.0.13"
pyright = "^1.1.314"
Expand Down Expand Up @@ -135,7 +135,6 @@ select = [
"RET", # flake8-return
"SLOT", # flake8-slots
"SIM", # flake8-simplify
"TCH", # flake8-type-checking, configure correctly and add
"PTH", # flake8-use-pathlib
"PGH", # pygrep-hooks
"PL", # Pylint
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os
from collections.abc import Iterator
from unittest import mock

import pytest


@pytest.fixture()
def mock_env() -> Iterator[os._Environ[str]]:
"""Clear ``os.environ``.
:yield: ``os.environ``. Prevents the function and the mock
context from exiting.
"""
with mock.patch.dict(os.environ, clear=True):
yield os.environ
24 changes: 12 additions & 12 deletions tests/pipeline/snapshots/snap_test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
'replicaCount': 1,
'schedule': '0 12 * * *',
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'extraOutputTopics': {
},
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-account-producer-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
},
'suspend': True
},
Expand Down Expand Up @@ -74,12 +74,12 @@
'replicaCount': 1,
'schedule': '0 12 * * *',
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'extraOutputTopics': {
},
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-transaction-avro-producer-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
},
'suspend': True
},
Expand Down Expand Up @@ -129,14 +129,14 @@
},
'replicaCount': 1,
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'errorTopic': 'bakdata-atm-fraud-detection-transaction-joiner-dead-letter-topic',
'inputTopics': [
'bakdata-atm-fraud-detection-transaction-avro-producer-topic'
],
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-transaction-joiner-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
}
},
'name': 'transaction-joiner',
Expand Down Expand Up @@ -191,14 +191,14 @@
},
'replicaCount': 1,
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'errorTopic': 'bakdata-atm-fraud-detection-fraud-detector-dead-letter-topic',
'inputTopics': [
'bakdata-atm-fraud-detection-transaction-joiner-topic'
],
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-fraud-detector-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
}
},
'name': 'fraud-detector',
Expand Down Expand Up @@ -253,7 +253,7 @@
},
'replicaCount': 1,
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'errorTopic': 'bakdata-atm-fraud-detection-account-linker-dead-letter-topic',
'extraInputTopics': {
'accounts': [
Expand All @@ -265,7 +265,7 @@
],
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-account-linker-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
}
},
'from': {
Expand Down Expand Up @@ -315,7 +315,7 @@
'auto.create': True,
'connection.ds.pool.size': 5,
'connection.password': 'AppPassword',
'connection.url': 'jdbc:postgresql://postgresql-dev.kpops.svc.cluster.local:5432/app_db',
'connection.url': 'jdbc:postgresql://postgresql-dev.${NAMESPACE}.svc.cluster.local:5432/app_db',
'connection.user': 'app1',
'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector',
'errors.deadletterqueue.context.headers.enable': True,
Expand All @@ -333,7 +333,7 @@
'transforms': 'flatten',
'transforms.flatten.type': 'org.apache.kafka.connect.transforms.Flatten$Value',
'value.converter': 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'value.converter.schema.registry.url': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
},
'name': 'postgresql-connector',
'namespace': '${NAMESPACE}',
Expand Down
2 changes: 2 additions & 0 deletions tests/pipeline/test_example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
import yaml
from snapshottest.module import SnapshotTest
from typer.testing import CliRunner
Expand All @@ -7,6 +8,7 @@
runner = CliRunner()


@pytest.mark.usefixtures("mock_env")
class TestExample:
def test_atm_fraud(self, snapshot: SnapshotTest):
result = runner.invoke(
Expand Down
1 change: 1 addition & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
PIPELINE_BASE_DIR_PATH = RESOURCE_PATH.parent


@pytest.mark.usefixtures("mock_env")
class TestPipeline:
def test_python_api(self):
pipeline = kpops.generate(
Expand Down

0 comments on commit 352cf9c

Please sign in to comment.