diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ecaacac42..025ed524a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -42,6 +42,9 @@ jobs: - name: Typing (mypy) run: poetry run pre-commit run mypy --all-files + - name: Typing (pyright) + run: poetry run pre-commit run pyright --all-files + - name: Generate schema (kpops schema) run: poetry run pre-commit run gen-schema --all-files --show-diff-on-failure diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0af74cadc..b2a38235a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -26,6 +26,15 @@ repos: types: [python] require_serial: true # run once for all files exclude: ^tests/.*snapshots/ + - repo: local + hooks: + - id: pyright + name: pyright + entry: pyright + language: system + types: [python] + require_serial: true # run once for all files + exclude: ^tests/.*snapshots/ - repo: https://github.com/asottile/pyupgrade rev: v3.1.0 hooks: diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index 247b8e053..01e945492 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -5,6 +5,7 @@ from functools import cached_property from schema_registry.client import SchemaRegistryClient +from schema_registry.client.schema import AvroSchema from kpops.cli.exception import ClassNotFoundError from kpops.cli.pipeline_config import PipelineConfig @@ -32,7 +33,7 @@ def schema_provider(self) -> SchemaProvider: f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists." ) schema_provider_class = find_class(self.components_module, SchemaProvider) # type: ignore[type-abstract] - return schema_provider_class() + return schema_provider_class() # pyright: ignore[reportGeneralTypeIssues] except ClassNotFoundError: raise ValueError( f"No schema provider found in components module {self.components_module}. " @@ -138,8 +139,13 @@ def __check_compatibility( if not self.schema_registry_client.test_compatibility( subject=subject, schema=schema ): + schema_str = ( + schema.flat_schema + if isinstance(schema, AvroSchema) + else str(schema) + ) raise Exception( - f"Schema is not compatible for {subject} and model {schema_class}. \n {json.dumps(schema.flat_schema, indent=4)}" + f"Schema is not compatible for {subject} and model {schema_class}. \n {json.dumps(schema_str, indent=4)}" ) else: log.debug( diff --git a/kpops/component_handlers/topic/handler.py b/kpops/component_handlers/topic/handler.py index 40fbf5706..1df0d106a 100644 --- a/kpops/component_handlers/topic/handler.py +++ b/kpops/component_handlers/topic/handler.py @@ -27,8 +27,7 @@ def __init__(self, proxy_wrapper: ProxyWrapper): self.proxy_wrapper = proxy_wrapper def create_topics(self, to_section: ToSection, dry_run: bool) -> None: - topics: dict[str, TopicConfig] = to_section.topics - for topic_name, topic_config in topics.items(): + for topic_name, topic_config in to_section.topics.items(): topic_spec = self.__prepare_body(topic_name, topic_config) if dry_run: self.__dry_run_topic_creation(topic_name, topic_spec, topic_config) @@ -66,8 +65,7 @@ def create_topics(self, to_section: ToSection, dry_run: bool) -> None: self.proxy_wrapper.create_topic(topic_spec=topic_spec) def delete_topics(self, to_section: ToSection, dry_run: bool) -> None: - topics: dict[str, TopicConfig] = to_section.topics - for topic_name in topics.keys(): + for topic_name in to_section.topics.keys(): if dry_run: self.__dry_run_topic_deletion(topic_name=topic_name) else: diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index e87c13f54..a55d11dde 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -89,9 +89,6 @@ class KafkaConnector(PipelineComponent, ABC): class Config(CamelCaseConfig): pass - def __init__(self, **kwargs) -> None: - super().__init__(**kwargs) - @cached_property def helm(self) -> Helm: """Helm object that contains component-specific config such as repo""" diff --git a/kpops/components/base_components/models/__init__.py b/kpops/components/base_components/models/__init__.py index e69de29bb..c7085b870 100644 --- a/kpops/components/base_components/models/__init__.py +++ b/kpops/components/base_components/models/__init__.py @@ -0,0 +1,3 @@ +from typing import NewType + +TopicName = NewType("TopicName", str) diff --git a/kpops/components/base_components/models/from_section.py b/kpops/components/base_components/models/from_section.py index 2b9674c74..eae550557 100644 --- a/kpops/components/base_components/models/from_section.py +++ b/kpops/components/base_components/models/from_section.py @@ -3,6 +3,7 @@ from pydantic import BaseModel, Extra, Field, root_validator +from kpops.components.base_components.models import TopicName from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import DescConfig @@ -54,7 +55,6 @@ def extra_topic_role(cls, values: dict) -> dict: return values -TopicName = NewType("TopicName", str) ComponentName = NewType("ComponentName", str) diff --git a/kpops/components/base_components/models/to_section.py b/kpops/components/base_components/models/to_section.py index 8ad868814..19dec144b 100644 --- a/kpops/components/base_components/models/to_section.py +++ b/kpops/components/base_components/models/to_section.py @@ -3,6 +3,7 @@ from pydantic import BaseModel, Extra, Field, root_validator +from kpops.components.base_components.models import TopicName from kpops.utils.docstring import describe_attr from kpops.utils.pydantic import DescConfig @@ -89,7 +90,7 @@ class ToSection(BaseModel): models: dict[str, Any] = Field( default={}, description=describe_attr("models", __doc__) ) - topics: dict[str, TopicConfig] = Field( + topics: dict[TopicName, TopicConfig] = Field( ..., description=describe_attr("topics", __doc__) ) diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index 00bbced7f..1430f88a1 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -76,7 +76,7 @@ class Config(CamelCaseConfig, DescConfig): extra = Extra.allow keep_untouched = (cached_property,) - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: super().__init__(**kwargs) self.set_input_topics() self.set_output_topics() diff --git a/poetry.lock b/poetry.lock index bc80a39f3..8a55092d4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -933,6 +933,24 @@ files = [ markdown = ">=3.2" pyyaml = "*" +[[package]] +name = "pyright" +version = "1.1.314" +description = "Command line wrapper for pyright" +optional = false +python-versions = ">=3.7" +files = [ + {file = "pyright-1.1.314-py3-none-any.whl", hash = "sha256:5008a2e04b71e35c5f1b78b16adae9d012601197442ae6c798e9bb3456d1eecb"}, + {file = "pyright-1.1.314.tar.gz", hash = "sha256:bd104c206fe40eaf5f836efa9027f07cc0efcbc452e6d22dfae36759c5fd28b3"}, +] + +[package.dependencies] +nodeenv = ">=1.6.0" + +[package.extras] +all = ["twine (>=3.4.1)"] +dev = ["twine (>=3.4.1)"] + [[package]] name = "pyrsistent" version = "0.19.2" @@ -1620,4 +1638,4 @@ watchmedo = ["PyYAML (>=3.10)"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "fc1e50461f83005d3f32ef7fa8b37e071666274161cde78827766e434307d2e9" +content-hash = "961df9b75500b501859c1e649605c171bc13d8deeb46f00ac58dc7810d30bb1e" diff --git a/pyproject.toml b/pyproject.toml index 24fd95364..77d97f08d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ kpops = "kpops.cli.main:app" [tool.poetry.dependencies] python = "^3.10" -pydantic = {extras = ["dotenv"], version = "^1.10.8"} +pydantic = { extras = ["dotenv"], version = "^1.10.8" } rich = "^12.4.4" PyYAML = "^6.0" requests = "^2.28.0" @@ -50,6 +50,7 @@ flake8 = "^4.0.1" black = "^22.3.0" isort = "^5.12.0" typer-cli = "^0.0.13" +pyright = "^1.1.314" [tool.poetry.group.docs] optional = true diff --git a/tests/cli/resources/module.py b/tests/cli/resources/module.py index c5b3f360e..a0bd45c3f 100644 --- a/tests/cli/resources/module.py +++ b/tests/cli/resources/module.py @@ -1,5 +1,7 @@ from typing import Any +from schema_registry.client.schema import AvroSchema + from kpops.component_handlers.schema_handler.schema_provider import ( Schema, SchemaProvider, @@ -8,4 +10,4 @@ class CustomSchemaProvider(SchemaProvider): def provide_schema(self, schema_class: str, models: dict[str, Any]) -> Schema: - pass + return AvroSchema() diff --git a/tests/component_handlers/schema_handler/resources/module.py b/tests/component_handlers/schema_handler/resources/module.py index c5b3f360e..a0bd45c3f 100644 --- a/tests/component_handlers/schema_handler/resources/module.py +++ b/tests/component_handlers/schema_handler/resources/module.py @@ -1,5 +1,7 @@ from typing import Any +from schema_registry.client.schema import AvroSchema + from kpops.component_handlers.schema_handler.schema_provider import ( Schema, SchemaProvider, @@ -8,4 +10,4 @@ class CustomSchemaProvider(SchemaProvider): def provide_schema(self, schema_class: str, models: dict[str, Any]) -> Schema: - pass + return AvroSchema() diff --git a/tests/component_handlers/schema_handler/test_schema_handler.py b/tests/component_handlers/schema_handler/test_schema_handler.py index 521beeea5..46448f12f 100644 --- a/tests/component_handlers/schema_handler/test_schema_handler.py +++ b/tests/component_handlers/schema_handler/test_schema_handler.py @@ -5,11 +5,13 @@ import pytest from pytest_mock import MockerFixture +from schema_registry.client.schema import AvroSchema from schema_registry.client.utils import SchemaVersion from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.schema_handler.schema_provider import SchemaProvider +from kpops.components.base_components.models import TopicName from kpops.components.base_components.models.to_section import ( OutputTopicTypes, TopicConfig, @@ -56,6 +58,21 @@ def schema_registry_mock(mocker: MockerFixture) -> MagicMock: return schema_registry_mock.return_value +@pytest.fixture() +def topic_config() -> TopicConfig: + return TopicConfig( + type=OutputTopicTypes.OUTPUT, + # pyright has no way of validating these aliased Pydantic fields because we're also using the allow_population_by_field_name setting + key_schema=None, # pyright: ignore[reportGeneralTypeIssues] + value_schema="com.bakdata.kpops.test.SchemaHandlerTest", # pyright: ignore[reportGeneralTypeIssues] + ) + + +@pytest.fixture() +def to_section(topic_config: TopicConfig) -> ToSection: + return ToSection(topics={TopicName("topic-X"): topic_config}) + + def test_load_schema_handler(): config_enable = PipelineConfig( defaults_path=Path("fake"), @@ -70,13 +87,9 @@ def test_load_schema_handler(): is None ) - assert ( - type( - SchemaHandler.load_schema_handler( - TEST_SCHEMA_PROVIDER_MODULE, config_enable - ) - ) - is SchemaHandler + assert isinstance( + SchemaHandler.load_schema_handler(TEST_SCHEMA_PROVIDER_MODULE, config_enable), + SchemaHandler, ) @@ -148,17 +161,11 @@ def test_should_raise_value_error_when_schema_provider_is_called_and_components_ def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( - log_info_mock: MagicMock, schema_registry_mock: MagicMock + to_section: ToSection, log_info_mock: MagicMock, schema_registry_mock: MagicMock ): schema_handler = SchemaHandler( url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) - topic_config = TopicConfig( - type=OutputTopicTypes.OUTPUT, - key_schema=None, - value_schema="com.bakdata.kpops.test.SchemaHandlerTest", - ) - to_section = ToSection(topics={"topic-X": topic_config}) schema_registry_mock.get_versions.return_value = [] @@ -171,17 +178,14 @@ def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( - log_info_mock: MagicMock, schema_registry_mock: MagicMock + topic_config: TopicConfig, + to_section: ToSection, + log_info_mock: MagicMock, + schema_registry_mock: MagicMock, ): schema_handler = SchemaHandler( url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) - topic_config = TopicConfig( - type=OutputTopicTypes.OUTPUT, - key_schema=None, - value_schema="com.bakdata.kpops.test.SchemaHandlerTest", - ) - to_section = ToSection(topics={"topic-X": topic_config}) schema_registry_mock.get_versions.return_value = [1, 2, 3] schema_registry_mock.check_version.return_value = None @@ -196,19 +200,15 @@ def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( def test_should_raise_exception_when_submit_schema_that_exists_and_not_compatible_and_dry_run_true( - log_info_mock: MagicMock, schema_registry_mock: MagicMock + topic_config: TopicConfig, + to_section: ToSection, + schema_registry_mock: MagicMock, ): schema_provider = TestSchemaProvider() schema_handler = SchemaHandler( url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" - topic_config = TopicConfig( - type=OutputTopicTypes.OUTPUT, - key_schema=None, - value_schema=schema_class, - ) - to_section = ToSection(topics={"topic-X": topic_config}) schema_registry_mock.get_versions.return_value = [1, 2, 3] schema_registry_mock.check_version.return_value = None @@ -217,29 +217,38 @@ def test_should_raise_exception_when_submit_schema_that_exists_and_not_compatibl with pytest.raises(Exception) as exception: schema_handler.submit_schemas(to_section, True) + assert "Schema is not compatible for" in str(exception.value) + EXPECTED_SCHEMA = { + "type": "record", + "name": "KPOps.Employee", + "fields": [ + {"name": "Name", "type": "string"}, + {"name": "Age", "type": "int"}, + ], + } schema = schema_provider.provide_schema(schema_class, {}) + assert isinstance(schema, AvroSchema) + assert schema.flat_schema == EXPECTED_SCHEMA assert ( str(exception.value) - == f"Schema is not compatible for topic-X-value and model {topic_config.value_schema}. \n {json.dumps(schema.flat_schema, indent=4)}" + == f"Schema is not compatible for topic-X-value and model {topic_config.value_schema}. \n {json.dumps(EXPECTED_SCHEMA, indent=4)}" ) schema_registry_mock.register.assert_not_called() def test_should_log_debug_when_submit_schema_that_exists_and_registered_under_version_and_dry_run_true( - log_info_mock: MagicMock, log_debug_mock: MagicMock, schema_registry_mock: MagicMock + topic_config: TopicConfig, + to_section: ToSection, + log_info_mock: MagicMock, + log_debug_mock: MagicMock, + schema_registry_mock: MagicMock, ): schema_provider = TestSchemaProvider() schema_handler = SchemaHandler( url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" - topic_config = TopicConfig( - type=OutputTopicTypes.OUTPUT, - key_schema=None, - value_schema=schema_class, - ) - to_section = ToSection(topics={"topic-X": topic_config}) schema = schema_provider.provide_schema(schema_class, {}) registered_version = SchemaVersion(topic_config.value_schema, 1, schema, 1) @@ -264,7 +273,10 @@ def test_should_log_debug_when_submit_schema_that_exists_and_registered_under_ve def test_should_submit_non_existing_schema_when_not_dry( - log_info_mock: MagicMock, schema_registry_mock: MagicMock + topic_config: TopicConfig, + to_section: ToSection, + log_info_mock: MagicMock, + schema_registry_mock: MagicMock, ): schema_provider = TestSchemaProvider() schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" @@ -272,12 +284,6 @@ def test_should_submit_non_existing_schema_when_not_dry( schema_handler = SchemaHandler( url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) - topic_config = TopicConfig( - type=OutputTopicTypes.OUTPUT, - key_schema=None, - value_schema=schema_class, - ) - to_section = ToSection(topics={"topic-X": topic_config}) schema_registry_mock.get_versions.return_value = [] @@ -295,17 +301,13 @@ def test_should_submit_non_existing_schema_when_not_dry( def test_should_log_correct_message_when_delete_schemas_and_in_dry_run( - log_info_mock: MagicMock, schema_registry_mock: MagicMock + to_section: ToSection, + log_info_mock: MagicMock, + schema_registry_mock: MagicMock, ): schema_handler = SchemaHandler( url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) - topic_config = TopicConfig( - type=OutputTopicTypes.OUTPUT, - key_schema=None, - value_schema="com.bakdata.kpops.test.SchemaHandlerTest", - ) - to_section = ToSection(topics={"topic-X": topic_config}) schema_registry_mock.get_versions.return_value = [] @@ -319,17 +321,11 @@ def test_should_log_correct_message_when_delete_schemas_and_in_dry_run( def test_should_delete_schemas_when_not_in_dry_run( - log_info_mock: MagicMock, schema_registry_mock: MagicMock + to_section: ToSection, schema_registry_mock: MagicMock ): schema_handler = SchemaHandler( url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE ) - topic_config = TopicConfig( - type=OutputTopicTypes.OUTPUT, - key_schema=None, - value_schema="com.bakdata.kpops.test.SchemaHandlerTest", - ) - to_section = ToSection(topics={"topic-X": topic_config}) schema_registry_mock.get_versions.return_value = [] diff --git a/tests/component_handlers/topic/test_topic_handler.py b/tests/component_handlers/topic/test_topic_handler.py index df5f72c83..c53a7a60d 100644 --- a/tests/component_handlers/topic/test_topic_handler.py +++ b/tests/component_handlers/topic/test_topic_handler.py @@ -18,6 +18,7 @@ TopicResponse, TopicSpec, ) +from kpops.components.base_components.models import TopicName from kpops.components.base_components.models.to_section import ( OutputTopicTypes, TopicConfig, @@ -101,7 +102,7 @@ def test_should_call_create_topic_with_dry_run_false(self): replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=False) @@ -130,7 +131,7 @@ def test_should_call_update_topic_config_when_topic_exists_and_with_dry_run_fals replication_factor=3, configs={"cleanup.policy": "delete", "delete.retention.ms": "123456789"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=False) @@ -157,7 +158,7 @@ def test_should_update_topic_config_when_one_config_changed( replication_factor=3, configs={"cleanup.policy": "delete", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=False) @@ -179,7 +180,7 @@ def test_should_not_update_topic_config_when_config_not_changed( replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=False) @@ -200,7 +201,7 @@ def test_should_not_update_topic_config_when_config_not_changed_and_not_ordered( replication_factor=3, configs={"compression.type": "gzip", "cleanup.policy": "compact"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=False) @@ -222,7 +223,7 @@ def test_should_call_reset_topic_config_when_topic_exists_dry_run_false_and_topi replication_factor=3, configs={"cleanup.policy": "compact"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=False) @@ -243,7 +244,7 @@ def test_should_not_call_create_topics_with_dry_run_true_and_topic_not_exists(se replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=True) @@ -264,7 +265,7 @@ def test_should_print_message_with_dry_run_true_and_topic_not_exists( replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=True) @@ -289,7 +290,7 @@ def test_should_print_message_if_dry_run_and_topic_exists_with_same_partition_co replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=True) wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff @@ -323,7 +324,7 @@ def test_should_print_message_if_dry_run_and_topic_exists_with_default_partition type=OutputTopicTypes.OUTPUT, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.create_topics(to_section=to_section, dry_run=True) wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff @@ -361,7 +362,7 @@ def test_should_exit_if_dry_run_and_topic_exists_different_partition_count( replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) with pytest.raises( TopicTransactionError, @@ -383,7 +384,7 @@ def test_should_exit_if_dry_run_and_topic_exists_different_replication_factor( replication_factor=300, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) with pytest.raises( TopicTransactionError, @@ -405,7 +406,7 @@ def test_should_log_correct_message_when_delete_existing_topic_dry_run( replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.delete_topics(to_section, True) @@ -430,7 +431,7 @@ def test_should_log_correct_message_when_delete_non_existing_topic_dry_run( replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.delete_topics(to_section, True) @@ -449,7 +450,7 @@ def test_should_call_delete_topic_not_dry_run(self): replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.delete_topics(to_section, False) @@ -472,7 +473,7 @@ def test_should_print_correct_warning_when_deleting_topic_that_does_not_exists_n replication_factor=3, configs={"cleanup.policy": "compact", "compression.type": "gzip"}, ) - to_section = ToSection(topics={"topic-X": topic_config}) + to_section = ToSection(topics={TopicName("topic-X"): topic_config}) topic_handler.delete_topics(to_section, False) wrapper.get_topic.assert_called_once_with(topic_name="topic-X") diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index dc9484916..1b3f5229f 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -145,7 +145,7 @@ def test_inherit(self, config: PipelineConfig, handlers: ComponentHandlers): component = TestChildModel( config=config, handlers=handlers, - **{"name": "name-defined-in-pipeline_generator"}, + name="name-defined-in-pipeline_generator", ) assert ( diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 59ef2fe96..00f6817a4 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -20,6 +20,7 @@ FromSection, FromTopic, InputTopicTypes, + TopicName, ) from kpops.components.base_components.models.to_section import ( OutputTopicTypes, @@ -59,7 +60,7 @@ def handlers(self) -> ComponentHandlers: topic_handler=MagicMock(), ) - @pytest.fixture + @pytest.fixture(autouse=True) def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.components.base_components.kafka_connector.Helm" @@ -71,12 +72,31 @@ def dry_run_handler(self, mocker: MockerFixture) -> MagicMock: "kpops.components.base_components.kafka_connector.DryRunHandler" ).return_value + @pytest.fixture + def connector( + self, config: PipelineConfig, handlers: ComponentHandlers + ) -> KafkaSinkConnector: + return KafkaSinkConnector( + name=CONNECTOR_NAME, + config=config, + handlers=handlers, + app=KafkaConnectConfig(), + namespace="test-namespace", + to=ToSection( + topics={ + TopicName("${output_topic_name}"): TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + } + ), + ) + def test_connector_config_parsing( self, config: PipelineConfig, handlers: ComponentHandlers ): topic_name = "connector-topic" connector = KafkaSinkConnector( - name="test-connector", + name=CONNECTOR_NAME, config=config, handlers=handlers, app=KafkaConnectConfig(**{"topics": topic_name}), @@ -86,7 +106,7 @@ def test_connector_config_parsing( topic_pattern = ".*" connector = KafkaSinkConnector( - name="test-connector", + name=CONNECTOR_NAME, config=config, handlers=handlers, app=KafkaConnectConfig(**{"topics.regex": topic_pattern}), @@ -97,15 +117,15 @@ def test_connector_config_parsing( def test_from_section_parsing_input_topic( self, config: PipelineConfig, handlers: ComponentHandlers ): - topic1 = "connector-topic1" - topic2 = "connector-topic2" + topic1 = TopicName("connector-topic1") + topic2 = TopicName("connector-topic2") connector = KafkaSinkConnector( - name="test-connector", + name=CONNECTOR_NAME, config=config, handlers=handlers, app=KafkaConnectConfig(), namespace="test-namespace", - from_=FromSection( + from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type topics={ topic1: FromTopic(type=InputTopicTypes.INPUT), topic2: FromTopic(type=InputTopicTypes.INPUT), @@ -121,14 +141,14 @@ def test_from_section_parsing_input_topic( def test_from_section_parsing_input_pattern( self, config: PipelineConfig, handlers: ComponentHandlers ): - topic_pattern = ".*" + topic_pattern = TopicName(".*") connector = KafkaSinkConnector( - name="test-connector", + name=CONNECTOR_NAME, config=config, handlers=handlers, app=KafkaConnectConfig(), namespace="test-namespace", - from_=FromSection( + from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type topics={topic_pattern: FromTopic(type=InputTopicTypes.INPUT_PATTERN)} ), ) @@ -136,25 +156,9 @@ def test_from_section_parsing_input_pattern( def test_deploy_order( self, - config: PipelineConfig, - handlers: ComponentHandlers, + connector: KafkaSinkConnector, mocker: MockerFixture, ): - connector = KafkaSinkConnector( - name="test-connector", - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - mock_create_topics = mocker.patch.object( connector.handlers.topic_handler, "create_topics" ) @@ -169,7 +173,7 @@ def test_deploy_order( assert mock.mock_calls == [ mocker.call.mock_create_topics(to_section=connector.to, dry_run=True), mocker.call.mock_create_connector( - connector_name="test-connector", + connector_name=CONNECTOR_NAME, kafka_connect_config=connector.app, dry_run=True, ), @@ -177,25 +181,9 @@ def test_deploy_order( def test_destroy( self, - config: PipelineConfig, - handlers: ComponentHandlers, + connector: KafkaSinkConnector, mocker: MockerFixture, ): - connector = KafkaSinkConnector( - name="test-connector", - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - mock_destroy_connector = mocker.patch.object( connector.handlers.connector_handler, "destroy_connector" ) @@ -203,32 +191,15 @@ def test_destroy( connector.destroy(dry_run=True) mock_destroy_connector.assert_called_once_with( - connector_name="test-connector", + connector_name=CONNECTOR_NAME, dry_run=True, ) def test_reset_when_dry_run_is_true( self, - config: PipelineConfig, - handlers: ComponentHandlers, - helm_mock: MagicMock, + connector: KafkaSinkConnector, dry_run_handler: MagicMock, ): - connector = KafkaSinkConnector( - name=CONNECTOR_NAME, - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - dry_run = True connector.reset(dry_run=dry_run) @@ -236,27 +207,11 @@ def test_reset_when_dry_run_is_true( def test_reset_when_dry_run_is_false( self, - config: PipelineConfig, - handlers: ComponentHandlers, + connector: KafkaSinkConnector, helm_mock: MagicMock, dry_run_handler: MagicMock, mocker: MockerFixture, ): - connector = KafkaSinkConnector( - name=CONNECTOR_NAME, - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - mock_delete_topics = mocker.patch.object( connector.handlers.topic_handler, "delete_topics" ) @@ -313,32 +268,16 @@ def test_reset_when_dry_run_is_false( def test_clean_when_dry_run_is_true( self, - config: PipelineConfig, - handlers: ComponentHandlers, - helm_mock: MagicMock, + connector: KafkaSinkConnector, dry_run_handler: MagicMock, ): - connector = KafkaSinkConnector( - name=CONNECTOR_NAME, - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - dry_run = True connector.clean(dry_run=dry_run) dry_run_handler.print_helm_diff.assert_called_once() def test_clean_when_dry_run_is_false( self, + connector: KafkaSinkConnector, config: PipelineConfig, handlers: ComponentHandlers, helm_mock: MagicMock, @@ -354,7 +293,7 @@ def test_clean_when_dry_run_is_false( namespace="test-namespace", to=ToSection( topics={ - "${output_topic_name}": TopicConfig( + TopicName("${output_topic_name}"): TopicConfig( type=OutputTopicTypes.OUTPUT, partitions_count=10 ), } @@ -434,7 +373,6 @@ def test_clean_without_to_when_dry_run_is_true( self, config: PipelineConfig, handlers: ComponentHandlers, - helm_mock: MagicMock, dry_run_handler: MagicMock, ): connector = KafkaSinkConnector( diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index 669d93001..b93477c4a 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -17,6 +17,7 @@ FromSection, FromTopic, InputTopicTypes, + TopicName, ) from kpops.components.base_components.models.to_section import ( OutputTopicTypes, @@ -52,7 +53,7 @@ def handlers(self) -> ComponentHandlers: topic_handler=MagicMock(), ) - @pytest.fixture + @pytest.fixture(autouse=True) def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( "kpops.components.base_components.kafka_connector.Helm" @@ -64,44 +65,50 @@ def dry_run_handler(self, mocker: MockerFixture) -> MagicMock: "kpops.components.base_components.kafka_connector.DryRunHandler" ).return_value + @pytest.fixture + def connector( + self, config: PipelineConfig, handlers: ComponentHandlers + ) -> KafkaSourceConnector: + return KafkaSourceConnector( + name=CONNECTOR_NAME, + config=config, + handlers=handlers, + app=KafkaConnectConfig(), + namespace="test-namespace", + to=ToSection( + topics={ + TopicName("${output_topic_name}"): TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + } + ), + offset_topic="kafka-connect-offsets", + ) + def test_from_section_raises_exception( self, config: PipelineConfig, handlers: ComponentHandlers ): with pytest.raises(NotImplementedError): KafkaSourceConnector( - name="test-connector", + name=CONNECTOR_NAME, config=config, handlers=handlers, app=KafkaConnectConfig(), namespace="test-namespace", - from_=FromSection( + from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type topics={ - "connector-topic": FromTopic(type=InputTopicTypes.INPUT), + TopicName("connector-topic"): FromTopic( + type=InputTopicTypes.INPUT + ), } ), ) def test_deploy_order( self, - config: PipelineConfig, - handlers: ComponentHandlers, + connector: KafkaSourceConnector, mocker: MockerFixture, ): - connector = KafkaSourceConnector( - name="test-connector", - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - mock_create_topics = mocker.patch.object( connector.handlers.topic_handler, "create_topics" ) @@ -117,7 +124,7 @@ def test_deploy_order( assert mock.mock_calls == [ mocker.call.mock_create_topics(to_section=connector.to, dry_run=True), mocker.call.mock_create_connector( - connector_name="test-connector", + connector_name=CONNECTOR_NAME, kafka_connect_config=connector.app, dry_run=True, ), @@ -125,25 +132,10 @@ def test_deploy_order( def test_destroy( self, - config: PipelineConfig, - handlers: ComponentHandlers, + connector: KafkaSourceConnector, mocker: MockerFixture, ): ENV["KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC"] = "kafka-connect-offsets" - connector = KafkaSourceConnector( - name="test-connector", - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) assert connector.handlers.connector_handler mock_destroy_connector = mocker.patch.object( @@ -153,33 +145,15 @@ def test_destroy( connector.destroy(dry_run=True) mock_destroy_connector.assert_called_once_with( - connector_name="test-connector", + connector_name=CONNECTOR_NAME, dry_run=True, ) def test_reset_when_dry_run_is_true( self, - config: PipelineConfig, - handlers: ComponentHandlers, - helm_mock: MagicMock, + connector: KafkaSourceConnector, dry_run_handler: MagicMock, ): - connector = KafkaSourceConnector( - name=CONNECTOR_NAME, - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - offset_topic="kafka-connect-offsets", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - assert connector.handlers.connector_handler connector.reset(dry_run=True) @@ -188,28 +162,11 @@ def test_reset_when_dry_run_is_true( def test_reset_when_dry_run_is_false( self, - config: PipelineConfig, - handlers: ComponentHandlers, + connector: KafkaSourceConnector, dry_run_handler: MagicMock, helm_mock: MagicMock, mocker: MockerFixture, ): - connector = KafkaSourceConnector( - name=CONNECTOR_NAME, - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - offset_topic="kafka-connect-offsets", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - assert connector.handlers.connector_handler mock_delete_topics = mocker.patch.object( connector.handlers.topic_handler, "delete_topics" @@ -266,26 +223,9 @@ def test_reset_when_dry_run_is_false( def test_clean_when_dry_run_is_true( self, - config: PipelineConfig, - handlers: ComponentHandlers, - helm_mock: MagicMock, + connector: KafkaSourceConnector, dry_run_handler: MagicMock, ): - connector = KafkaSourceConnector( - name=CONNECTOR_NAME, - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - offset_topic="kafka-connect-offsets", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) assert connector.handlers.connector_handler connector.clean(dry_run=True) @@ -294,27 +234,11 @@ def test_clean_when_dry_run_is_true( def test_clean_when_dry_run_is_false( self, - config: PipelineConfig, - handlers: ComponentHandlers, + connector: KafkaSourceConnector, helm_mock: MagicMock, dry_run_handler: MagicMock, mocker: MockerFixture, ): - connector = KafkaSourceConnector( - name=CONNECTOR_NAME, - config=config, - handlers=handlers, - app=KafkaConnectConfig(), - namespace="test-namespace", - offset_topic="kafka-connect-offsets", - to=ToSection( - topics={ - "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) assert connector.handlers.connector_handler mock_delete_topics = mocker.patch.object( diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index f2f8a5d7e..77c75cb28 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -12,6 +12,7 @@ RepoAuthFlags, ) from kpops.components import StreamsApp +from kpops.components.base_components.models import TopicName from kpops.components.base_components.models.to_section import ( OutputTopicTypes, TopicConfig, @@ -237,12 +238,16 @@ def test_weave_inputs_from_prev_component( streams_app.weave_from_topics( ToSection( topics={ - "prev-output-topic": TopicConfig( + TopicName("prev-output-topic"): TopicConfig( type=OutputTopicTypes.OUTPUT, partitions_count=10 ), - "b": TopicConfig(type=OutputTopicTypes.OUTPUT, partitions_count=10), - "a": TopicConfig(type=OutputTopicTypes.OUTPUT, partitions_count=10), - "prev-error-topic": TopicConfig( + TopicName("b"): TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + TopicName("a"): TopicConfig( + type=OutputTopicTypes.OUTPUT, partitions_count=10 + ), + TopicName("prev-error-topic"): TopicConfig( type=OutputTopicTypes.ERROR, partitions_count=10 ), } diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index e9a6c202f..4dfb2a0b0 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -9,6 +9,7 @@ ) from kpops.components import KafkaSinkConnector from kpops.components.base_components import PipelineComponent +from kpops.components.base_components.models import TopicName from kpops.components.base_components.models.to_section import ( OutputTopicTypes, TopicConfig, @@ -55,10 +56,10 @@ def inflate(self) -> list[PipelineComponent]: }, to=ToSection( topics={ - "${component_type}": TopicConfig( + TopicName("${component_type}"): TopicConfig( type=OutputTopicTypes.OUTPUT ), - "${component_name}": TopicConfig( + TopicName("${component_name}"): TopicConfig( type=OutputTopicTypes.EXTRA, role="test" ), } @@ -71,10 +72,9 @@ def inflate(self) -> list[PipelineComponent]: handlers=self.handlers, to=ToSection( topics={ - f"{self.name}-" - + "${component_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT - ) + TopicName( + f"{self.name}-" + "${component_name}" + ): TopicConfig(type=OutputTopicTypes.OUTPUT) } ).dict(), )