Skip to content

Commit

Permalink
Save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
irux committed Oct 23, 2023
2 parents 0ce9044 + 352cf9c commit 542c16a
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 31 deletions.
4 changes: 2 additions & 2 deletions examples/bakdata/atm-fraud-detection/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ topic_name_config:
default_error_topic_name: "${pipeline_name}-${component_name}-dead-letter-topic"
default_output_topic_name: "${pipeline_name}-${component_name}-topic"

brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092"
brokers: "http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092"

schema_registry_url: "http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081"
schema_registry_url: "http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081"

kafka_rest_host: "http://localhost:8082"

Expand Down
4 changes: 2 additions & 2 deletions examples/bakdata/atm-fraud-detection/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@
connector.class: io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max: 1
topics: ${pipeline_name}-account-linker-topic
connection.url: jdbc:postgresql://postgresql-dev.kpops.svc.cluster.local:5432/app_db
connection.url: jdbc:postgresql://postgresql-dev.${NAMESPACE}.svc.cluster.local:5432/app_db
connection.user: app1
connection.password: AppPassword
connection.ds.pool.size: 5
insert.mode: insert
insert.mode.databaselevel: true
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081
value.converter.schema.registry.url: http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081
key.converter: org.apache.kafka.connect.storage.StringConverter
transforms: flatten
transforms.flatten.type: org.apache.kafka.connect.transforms.Flatten$Value
Expand Down
9 changes: 3 additions & 6 deletions kpops/cli/pipeline_config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from __future__ import annotations

from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import Any

from pydantic import BaseConfig, BaseSettings, Field
from pydantic.env_settings import SettingsSourceCallable

from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig
from kpops.utils.yaml_loading import load_yaml_file

if TYPE_CHECKING:
from collections.abc import Callable

from pydantic.env_settings import SettingsSourceCallable

ENV_PREFIX = "KPOPS_"


Expand Down
2 changes: 1 addition & 1 deletion kpops/component_handlers/helm_wrapper/helm.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def __execute(self, command: list[str]) -> str:
log.debug(f"Executing {' '.join(command)}")
process = subprocess.run(
command,
check=True,
check=False,
capture_output=True,
text=True,
)
Expand Down
6 changes: 2 additions & 4 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from abc import ABC
from functools import cached_property
from typing import TYPE_CHECKING, Any, NoReturn
from typing import Any, NoReturn

from pydantic import Field, validator
from typing_extensions import override
Expand All @@ -25,13 +25,11 @@
KafkaConnectResetterValues,
)
from kpops.components.base_components.base_defaults_component import deduplicate
from kpops.components.base_components.models.from_section import FromTopic
from kpops.components.base_components.pipeline_component import PipelineComponent
from kpops.utils.colorify import magentaify
from kpops.utils.docstring import describe_attr

if TYPE_CHECKING:
from kpops.components.base_components.models.from_section import FromTopic

log = logging.getLogger("KafkaConnector")


Expand Down
30 changes: 28 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pytest-mock = "^3.10.0"
pytest-timeout = "^2.1.0"
snapshottest = "^0.6.0"
pre-commit = "^2.19.0"
ruff = "^0.0.292"
ruff = "^0.1.1"
black = "^23.7.0"
typer-cli = "^0.0.13"
pyright = "^1.1.314"
Expand Down Expand Up @@ -140,7 +140,6 @@ select = [
"RET", # flake8-return
"SLOT", # flake8-slots
"SIM", # flake8-simplify
"TCH", # flake8-type-checking, configure correctly and add
"PTH", # flake8-use-pathlib
"PGH", # pygrep-hooks
"PL", # Pylint
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os
from collections.abc import Iterator
from unittest import mock

import pytest


@pytest.fixture()
def mock_env() -> Iterator[os._Environ[str]]:
"""Clear ``os.environ``.
:yield: ``os.environ``. Prevents the function and the mock
context from exiting.
"""
with mock.patch.dict(os.environ, clear=True):
yield os.environ
24 changes: 12 additions & 12 deletions tests/pipeline/snapshots/snap_test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
'replicaCount': 1,
'schedule': '0 12 * * *',
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'extraOutputTopics': {
},
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-account-producer-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
},
'suspend': True
},
Expand Down Expand Up @@ -74,12 +74,12 @@
'replicaCount': 1,
'schedule': '0 12 * * *',
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'extraOutputTopics': {
},
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-transaction-avro-producer-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
},
'suspend': True
},
Expand Down Expand Up @@ -129,14 +129,14 @@
},
'replicaCount': 1,
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'errorTopic': 'bakdata-atm-fraud-detection-transaction-joiner-dead-letter-topic',
'inputTopics': [
'bakdata-atm-fraud-detection-transaction-avro-producer-topic'
],
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-transaction-joiner-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
}
},
'name': 'transaction-joiner',
Expand Down Expand Up @@ -191,14 +191,14 @@
},
'replicaCount': 1,
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'errorTopic': 'bakdata-atm-fraud-detection-fraud-detector-dead-letter-topic',
'inputTopics': [
'bakdata-atm-fraud-detection-transaction-joiner-topic'
],
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-fraud-detector-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
}
},
'name': 'fraud-detector',
Expand Down Expand Up @@ -253,7 +253,7 @@
},
'replicaCount': 1,
'streams': {
'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092',
'brokers': 'http://k8kafka-cp-kafka-headless.${NAMESPACE}.svc.cluster.local:9092',
'errorTopic': 'bakdata-atm-fraud-detection-account-linker-dead-letter-topic',
'extraInputTopics': {
'accounts': [
Expand All @@ -265,7 +265,7 @@
],
'optimizeLeaveGroupBehavior': False,
'outputTopic': 'bakdata-atm-fraud-detection-account-linker-topic',
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'schemaRegistryUrl': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
}
},
'from': {
Expand Down Expand Up @@ -315,7 +315,7 @@
'auto.create': True,
'connection.ds.pool.size': 5,
'connection.password': 'AppPassword',
'connection.url': 'jdbc:postgresql://postgresql-dev.kpops.svc.cluster.local:5432/app_db',
'connection.url': 'jdbc:postgresql://postgresql-dev.${NAMESPACE}.svc.cluster.local:5432/app_db',
'connection.user': 'app1',
'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector',
'errors.deadletterqueue.context.headers.enable': True,
Expand All @@ -333,7 +333,7 @@
'transforms': 'flatten',
'transforms.flatten.type': 'org.apache.kafka.connect.transforms.Flatten$Value',
'value.converter': 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url': 'http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081'
'value.converter.schema.registry.url': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081'
},
'name': 'postgresql-connector',
'namespace': '${NAMESPACE}',
Expand Down
2 changes: 2 additions & 0 deletions tests/pipeline/test_example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
import yaml
from snapshottest.module import SnapshotTest
from typer.testing import CliRunner
Expand All @@ -7,6 +8,7 @@
runner = CliRunner()


@pytest.mark.usefixtures("mock_env")
class TestExample:
def test_atm_fraud(self, snapshot: SnapshotTest):
result = runner.invoke(
Expand Down
1 change: 1 addition & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
PIPELINE_BASE_DIR_PATH = RESOURCE_PATH.parent


@pytest.mark.usefixtures("mock_env")
class TestPipeline:
def test_python_api(self):
pipeline = kpops.generate(
Expand Down

0 comments on commit 542c16a

Please sign in to comment.