Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix early exit upon Helm exit code 1 #376

Merged
merged 11 commits into from
Oct 23, 2023
4 changes: 2 additions & 2 deletions examples/bakdata/atm-fraud-detection/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ topic_name_config:
default_error_topic_name: "${pipeline_name}-${component_name}-dead-letter-topic"
default_output_topic_name: "${pipeline_name}-${component_name}-topic"

brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092"
brokers: "http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092"
sujuka99 marked this conversation as resolved.
Show resolved Hide resolved

schema_registry_url: "http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081"
schema_registry_url: "http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081"

kafka_rest_host: "http://localhost:8082"

Expand Down
4 changes: 2 additions & 2 deletions examples/bakdata/atm-fraud-detection/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@
connector.class: io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max: 1
topics: ${pipeline_name}-account-linker-topic
connection.url: jdbc:postgresql://postgresql-dev.kpops.svc.cluster.local:5432/app_db
connection.url: jdbc:postgresql://postgresql-dev.${NAMESPACE}.svc.cluster.local:5432/app_db
connection.user: app1
connection.password: AppPassword
connection.ds.pool.size: 5
insert.mode: insert
insert.mode.databaselevel: true
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081
value.converter.schema.registry.url: http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081
key.converter: org.apache.kafka.connect.storage.StringConverter
transforms: flatten
transforms.flatten.type: org.apache.kafka.connect.transforms.Flatten$Value
Expand Down
9 changes: 3 additions & 6 deletions kpops/cli/pipeline_config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from __future__ import annotations

from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import Any

from pydantic import BaseConfig, BaseSettings, Field
from pydantic.env_settings import SettingsSourceCallable

from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig
from kpops.utils.yaml_loading import load_yaml_file

if TYPE_CHECKING:
from collections.abc import Callable

from pydantic.env_settings import SettingsSourceCallable

ENV_PREFIX = "KPOPS_"


Expand Down
2 changes: 1 addition & 1 deletion kpops/component_handlers/helm_wrapper/helm.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def __execute(self, command: list[str]) -> str:
log.debug(f"Executing {' '.join(command)}")
process = subprocess.run(
command,
check=True,
check=False,
capture_output=True,
text=True,
)
Expand Down
6 changes: 2 additions & 4 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from abc import ABC
from functools import cached_property
from typing import TYPE_CHECKING, Any, NoReturn
from typing import Any, NoReturn

from pydantic import Field, validator
from typing_extensions import override
Expand All @@ -25,13 +25,11 @@
KafkaConnectResetterValues,
)
from kpops.components.base_components.base_defaults_component import deduplicate
from kpops.components.base_components.models.from_section import FromTopic
from kpops.components.base_components.pipeline_component import PipelineComponent
from kpops.utils.colorify import magentaify
from kpops.utils.docstring import describe_attr

if TYPE_CHECKING:
from kpops.components.base_components.models.from_section import FromTopic

log = logging.getLogger("KafkaConnector")


Expand Down
38 changes: 19 additions & 19 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pytest-mock = "^3.10.0"
pytest-timeout = "^2.1.0"
snapshottest = "^0.6.0"
pre-commit = "^2.19.0"
ruff = "^0.0.292"
ruff = "^0.1.1"
black = "^23.7.0"
typer-cli = "^0.0.13"
pyright = "^1.1.314"
Expand Down Expand Up @@ -135,7 +135,6 @@ select = [
"RET", # flake8-return
"SLOT", # flake8-slots
"SIM", # flake8-simplify
"TCH", # flake8-type-checking, configure correctly and add
"PTH", # flake8-use-pathlib
"PGH", # pygrep-hooks
"PL", # Pylint
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os
from collections.abc import Iterator
from unittest import mock

import pytest


@pytest.fixture()
def mock_env() -> Iterator[os._Environ[str]]:
"""Clear ``os.environ``.

:yield: ``os.environ``. Prevents the function and the mock
context from exiting.
"""
with mock.patch.dict(os.environ, clear=True):
yield os.environ
24 changes: 12 additions & 12 deletions tests/pipeline/snapshots/snap_test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
'replicaCount': 1,
'schedule': '0 12 * * *',
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'extraOutputTopics': {
},
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-account-producer-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
},
'suspend': True
},
Expand Down Expand Up @@ -74,12 +74,12 @@
'replicaCount': 1,
'schedule': '0 12 * * *',
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'extraOutputTopics': {
},
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-transaction-avro-producer-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
},
'suspend': True
},
Expand Down Expand Up @@ -129,14 +129,14 @@
},
'replicaCount': 1,
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'errorTopic': 'bakdata-atm-fraud-detection-transaction-joiner-dead-letter-topic',
'inputTopics': [
'bakdata-atm-fraud-detection-transaction-avro-producer-topic'
],
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-transaction-joiner-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
}
},
'name': 'transaction-joiner',
Expand Down Expand Up @@ -191,14 +191,14 @@
},
'replicaCount': 1,
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'errorTopic': 'bakdata-atm-fraud-detection-fraud-detector-dead-letter-topic',
'inputTopics': [
'bakdata-atm-fraud-detection-transaction-joiner-topic'
],
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-fraud-detector-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
}
},
'name': 'fraud-detector',
Expand Down Expand Up @@ -253,7 +253,7 @@
},
'replicaCount': 1,
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'errorTopic': 'bakdata-atm-fraud-detection-account-linker-dead-letter-topic',
'extraInputTopics': {
'accounts': [
Expand All @@ -265,7 +265,7 @@
],
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-account-linker-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
}
},
'from': {
Expand Down Expand Up @@ -315,7 +315,7 @@
'auto.create': True,
'connection.ds.pool.size': 5,
'connection.password': 'AppPassword',
'connection.url': 'jdbc:postgresql://postgresql-dev.kpops.svc.cluster.local:5432/app_db',
'connection.url': 'jdbc:postgresql://postgresql-dev.${NAMESPACE}.svc.cluster.local:5432/app_db',
'connection.user': 'app1',
'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector',
'errors.deadletterqueue.context.headers.enable': True,
Expand All @@ -333,7 +333,7 @@
'transforms': 'flatten',
'transforms.flatten.type': 'org.apache.kafka.connect.transforms.Flatten$Value',
'value.converter': 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'value.converter.schema.registry.url': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
},
'name': 'postgresql-connector',
'namespace': '${NAMESPACE}',
Expand Down
2 changes: 2 additions & 0 deletions tests/pipeline/test_example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
import yaml
from snapshottest.module import SnapshotTest
from typer.testing import CliRunner
Expand All @@ -7,6 +8,7 @@
runner = CliRunner()


@pytest.mark.usefixtures("mock_env")
class TestExample:
def test_atm_fraud(self, snapshot: SnapshotTest):
result = runner.invoke(
Expand Down
1 change: 1 addition & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
PIPELINE_BASE_DIR_PATH = RESOURCE_PATH.parent


@pytest.mark.usefixtures("mock_env")
class TestPipeline:
def test_python_api(self):
pipeline = kpops.generate(
Expand Down