diff --git a/kpops/cli/custom_formatter.py b/kpops/cli/custom_formatter.py index 69fc1c73d..ef977d24f 100644 --- a/kpops/cli/custom_formatter.py +++ b/kpops/cli/custom_formatter.py @@ -16,7 +16,7 @@ def format(self, record): logging.WARNING: typer.style(message_format, fg=typer.colors.YELLOW), logging.ERROR: typer.style(message_format, fg=typer.colors.RED), logging.CRITICAL: typer.style( - message_format, fg=typer.colors.RED, bold=True + message_format, fg=typer.colors.RED, bold=True, ), } diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 8a70c47f9..540e4d5c1 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -121,12 +121,12 @@ def setup_pipeline( handlers = setup_handlers(components_module, pipeline_config) return Pipeline.load_from_yaml( - pipeline_base_dir, pipeline_path, registry, pipeline_config, handlers + pipeline_base_dir, pipeline_path, registry, pipeline_config, handlers, ) def setup_handlers( - components_module: str | None, config: PipelineConfig + components_module: str | None, config: PipelineConfig, ) -> ComponentHandlers: schema_handler = SchemaHandler.load_schema_handler(components_module, config) connector_handler = KafkaConnectHandler.from_pipeline_config(config) @@ -149,13 +149,13 @@ def get_step_names(steps_to_apply: list[PipelineComponent]) -> list[str]: def filter_steps_to_apply( - pipeline: Pipeline, steps: set[str], filter_type: FilterType + pipeline: Pipeline, steps: set[str], filter_type: FilterType, ) -> list[PipelineComponent]: def is_in_steps(component: PipelineComponent) -> bool: return component.name in steps log.debug( - f"KPOPS_PIPELINE_STEPS is defined with values: {steps} and filter type of {filter_type.value}" + f"KPOPS_PIPELINE_STEPS is defined with values: {steps} and filter type of {filter_type.value}", ) filtered_steps = [ component @@ -171,7 +171,7 @@ def is_in_steps(component: PipelineComponent) -> bool: def get_steps_to_apply( - pipeline: Pipeline, steps: str | None, filter_type: FilterType + pipeline: Pipeline, steps: str | None, filter_type: FilterType, ) -> list[PipelineComponent]: if steps: return filter_steps_to_apply(pipeline, parse_steps(steps), filter_type) @@ -179,7 +179,7 @@ def get_steps_to_apply( def reverse_pipeline_steps( - pipeline: Pipeline, steps: str | None, filter_type: FilterType + pipeline: Pipeline, steps: str | None, filter_type: FilterType, ) -> Iterator[PipelineComponent]: return reversed(get_steps_to_apply(pipeline, steps, filter_type)) @@ -193,7 +193,7 @@ def log_action(action: str, pipeline_component: PipelineComponent): def create_pipeline_config( - config: Path, defaults: Optional[Path], verbose: bool + config: Path, defaults: Optional[Path], verbose: bool, ) -> PipelineConfig: setup_logging_level(verbose) PipelineConfig.Config.config_path = config @@ -210,7 +210,7 @@ def create_pipeline_config( Generate json schema. The schemas can be used to enable support for kpops files in a text editor. - """ + """, ) def schema( scope: SchemaScope = typer.Argument( @@ -225,7 +225,7 @@ def schema( ), components_module: Optional[str] = COMPONENTS_MODULES, include_stock_components: bool = typer.Option( - default=True, help="Include the built-in KPOps components." + default=True, help="Include the built-in KPOps components.", ), ) -> None: match scope: @@ -236,7 +236,7 @@ def schema( @app.command( # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 - help="Enriches pipelines steps with defaults. The output is used as input for the deploy/destroy/... commands." + help="Enriches pipelines steps with defaults. The output is used as input for the deploy/destroy/... commands.", ) def generate( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -251,7 +251,7 @@ def generate( ) -> Pipeline: pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, pipeline_config, ) if not template: @@ -264,14 +264,14 @@ def generate( elif steps: log.warning( "The following flags are considered only when `--template` is set: \n \ - '--steps'" + '--steps'", ) return pipeline @app.command( - help="Deploy pipeline steps" + help="Deploy pipeline steps", ) # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 def deploy( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -286,7 +286,7 @@ def deploy( ): pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, pipeline_config, ) steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) @@ -296,7 +296,7 @@ def deploy( @app.command( - help="Destroy pipeline steps" + help="Destroy pipeline steps", ) # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 def destroy( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -311,7 +311,7 @@ def destroy( ): pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, pipeline_config, ) pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) for component in pipeline_steps: @@ -320,7 +320,7 @@ def destroy( @app.command( - help="Reset pipeline steps" + help="Reset pipeline steps", ) # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 def reset( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -335,7 +335,7 @@ def reset( ): pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, pipeline_config, ) pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) for component in pipeline_steps: @@ -345,7 +345,7 @@ def reset( @app.command( - help="Clean pipeline steps" + help="Clean pipeline steps", ) # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 def clean( pipeline_path: Path = PIPELINE_PATH_ARG, @@ -360,7 +360,7 @@ def clean( ): pipeline_config = create_pipeline_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, pipeline_config, ) pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) for component in pipeline_steps: diff --git a/kpops/cli/pipeline_config.py b/kpops/cli/pipeline_config.py index 1400323f5..eb30b7c99 100644 --- a/kpops/cli/pipeline_config.py +++ b/kpops/cli/pipeline_config.py @@ -112,7 +112,7 @@ def customise_sources( env_settings: SettingsSourceCallable, file_secret_settings: SettingsSourceCallable, ) -> tuple[ - SettingsSourceCallable | Callable[[PipelineConfig], dict[str, Any]], ... + SettingsSourceCallable | Callable[[PipelineConfig], dict[str, Any]], ..., ]: return ( env_settings, diff --git a/kpops/cli/registry.py b/kpops/cli/registry.py index 97a910bcd..fc40f8938 100644 --- a/kpops/cli/registry.py +++ b/kpops/cli/registry.py @@ -39,7 +39,7 @@ def __getitem__(self, component_type: str) -> type[PipelineComponent]: return self._classes[component_type] except KeyError as ke: raise ClassNotFoundError( - f"Could not find a component of type {component_type}" + f"Could not find a component of type {component_type}", ) from ke @@ -56,7 +56,7 @@ def _find_classes(module_name: str, baseclass: type[T]) -> Iterator[type[T]]: if issubclass(_class, baseclass): # filter out internal kpops classes unless specifically requested if _class.__module__.startswith( - KPOPS_MODULE + KPOPS_MODULE, ) and not module_name.startswith(KPOPS_MODULE): continue yield _class diff --git a/kpops/component_handlers/helm_wrapper/dry_run_handler.py b/kpops/component_handlers/helm_wrapper/dry_run_handler.py index 2d28957b7..7b1429dab 100644 --- a/kpops/component_handlers/helm_wrapper/dry_run_handler.py +++ b/kpops/component_handlers/helm_wrapper/dry_run_handler.py @@ -18,7 +18,7 @@ def print_helm_diff(self, stdout: str, helm_release_name: str, log: Logger) -> N :param log: The Logger object of the component class """ current_release = list( - self._helm.get_manifest(helm_release_name, self.namespace) + self._helm.get_manifest(helm_release_name, self.namespace), ) if current_release: log.info(f"Helm release {helm_release_name} already exists") diff --git a/kpops/component_handlers/helm_wrapper/helm.py b/kpops/component_handlers/helm_wrapper/helm.py index 95c8ee4f0..0eb519cb6 100644 --- a/kpops/component_handlers/helm_wrapper/helm.py +++ b/kpops/component_handlers/helm_wrapper/helm.py @@ -30,7 +30,7 @@ def __init__(self, helm_config: HelmConfig) -> None: self._version = self.get_version() if self._version.major != 3: raise RuntimeError( - f"The supported Helm version is 3.x.x. The current Helm version is {self._version.major}.{self._version.minor}.{self._version.patch}" + f"The supported Helm version is 3.x.x. The current Helm version is {self._version.major}.{self._version.minor}.{self._version.patch}", ) def add_repo( @@ -121,7 +121,7 @@ def uninstall( return self.__execute(command) except ReleaseNotFoundException: log.warning( - f"Release with name {release_name} not found. Could not uninstall app." + f"Release with name {release_name} not found. Could not uninstall app.", ) def template( @@ -184,7 +184,7 @@ def get_version(self) -> Version: version_match = re.search(r"^v(\d+(?:\.\d+){0,2})", short_version) if version_match is None: raise RuntimeError( - f"Could not parse the Helm version.\n\nHelm output:\n{short_version}" + f"Could not parse the Helm version.\n\nHelm output:\n{short_version}", ) version = map(int, version_match.group(1).split(".")) return Version(*version) diff --git a/kpops/component_handlers/helm_wrapper/model.py b/kpops/component_handlers/helm_wrapper/model.py index 35dc43e0d..93e0116e2 100644 --- a/kpops/component_handlers/helm_wrapper/model.py +++ b/kpops/component_handlers/helm_wrapper/model.py @@ -31,19 +31,19 @@ class RepoAuthFlags(BaseModel): """ username: str | None = Field( - default=None, description=describe_attr("username", __doc__) + default=None, description=describe_attr("username", __doc__), ) password: str | None = Field( - default=None, description=describe_attr("password", __doc__) + default=None, description=describe_attr("password", __doc__), ) ca_file: Path | None = Field( - default=None, description=describe_attr("ca_file", __doc__) + default=None, description=describe_attr("ca_file", __doc__), ) cert_file: Path | None = Field( - default=None, description=describe_attr("cert_file", __doc__) + default=None, description=describe_attr("cert_file", __doc__), ) insecure_skip_tls_verify: bool = Field( - default=False, description=describe_attr("insecure_skip_tls_verify", __doc__) + default=False, description=describe_attr("insecure_skip_tls_verify", __doc__), ) class Config(DescConfig): @@ -73,11 +73,11 @@ class HelmRepoConfig(BaseModel): """ repository_name: str = Field( - default=..., description=describe_attr("repository_name", __doc__) + default=..., description=describe_attr("repository_name", __doc__), ) url: str = Field(default=..., description=describe_attr("url", __doc__)) repo_auth_flags: RepoAuthFlags = Field( - default=RepoAuthFlags(), description=describe_attr("repo_auth_flags", __doc__) + default=RepoAuthFlags(), description=describe_attr("repo_auth_flags", __doc__), ) class Config(DescConfig): @@ -131,7 +131,7 @@ def to_command(self) -> list[str]: [ "--set-file", ",".join([f"{key}={path}" for key, path in self.set_file.items()]), - ] + ], ) if self.create_namespace: command.append("--create-namespace") diff --git a/kpops/component_handlers/helm_wrapper/utils.py b/kpops/component_handlers/helm_wrapper/utils.py index 7ad76b93a..e05ee187f 100644 --- a/kpops/component_handlers/helm_wrapper/utils.py +++ b/kpops/component_handlers/helm_wrapper/utils.py @@ -16,7 +16,7 @@ def trim_release_name(name: str, suffix: str = "") -> str: if len(name) > RELEASE_NAME_MAX_LEN: new_name = name[: (RELEASE_NAME_MAX_LEN - len(suffix))] + suffix log.critical( - f"Invalid Helm release name '{name}'. Truncating to {RELEASE_NAME_MAX_LEN} characters: \n {name} --> {new_name}" + f"Invalid Helm release name '{name}'. Truncating to {RELEASE_NAME_MAX_LEN} characters: \n {name} --> {new_name}", ) name = new_name return name diff --git a/kpops/component_handlers/kafka_connect/connect_wrapper.py b/kpops/component_handlers/kafka_connect/connect_wrapper.py index ccd9666e3..7f81abb56 100644 --- a/kpops/component_handlers/kafka_connect/connect_wrapper.py +++ b/kpops/component_handlers/kafka_connect/connect_wrapper.py @@ -36,7 +36,7 @@ def host(self) -> str: return self._host def create_connector( - self, connector_config: KafkaConnectorConfig + self, connector_config: KafkaConnectorConfig, ) -> KafkaConnectResponse: """Create a new connector. @@ -47,7 +47,7 @@ def create_connector( config_json = connector_config.dict() connect_data = {"name": connector_config.name, "config": config_json} response = httpx.post( - url=f"{self._host}/connectors", headers=HEADERS, json=connect_data + url=f"{self._host}/connectors", headers=HEADERS, json=connect_data, ) if response.status_code == httpx.codes.CREATED: log.info(f"Connector {connector_config.name} created.") @@ -55,7 +55,7 @@ def create_connector( return KafkaConnectResponse(**response.json()) elif response.status_code == httpx.codes.CONFLICT: log.warning( - "Rebalancing in progress while creating a connector... Retrying..." + "Rebalancing in progress while creating a connector... Retrying...", ) time.sleep(1) self.create_connector(connector_config) @@ -71,7 +71,7 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse: :return: Information about the connector. """ response = httpx.get( - url=f"{self._host}/connectors/{connector_name}", headers=HEADERS + url=f"{self._host}/connectors/{connector_name}", headers=HEADERS, ) if response.status_code == httpx.codes.OK: log.info(f"Connector {connector_name} exists.") @@ -82,14 +82,14 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse: raise ConnectorNotFoundException() elif response.status_code == httpx.codes.CONFLICT: log.warning( - "Rebalancing in progress while getting a connector... Retrying..." + "Rebalancing in progress while getting a connector... Retrying...", ) sleep(1) self.get_connector(connector_name) raise KafkaConnectError(response) def update_connector_config( - self, connector_config: KafkaConnectorConfig + self, connector_config: KafkaConnectorConfig, ) -> KafkaConnectResponse: """Create or update a connector. @@ -117,14 +117,14 @@ def update_connector_config( return KafkaConnectResponse(**data) elif response.status_code == httpx.codes.CONFLICT: log.warning( - "Rebalancing in progress while updating a connector... Retrying..." + "Rebalancing in progress while updating a connector... Retrying...", ) sleep(1) self.update_connector_config(connector_config) raise KafkaConnectError(response) def validate_connector_config( - self, connector_config: KafkaConnectorConfig + self, connector_config: KafkaConnectorConfig, ) -> list[str]: """Validate connector config using the given configuration. @@ -140,7 +140,7 @@ def validate_connector_config( if response.status_code == httpx.codes.OK: kafka_connect_error_response = KafkaConnectConfigErrorResponse( - **response.json() + **response.json(), ) errors: list[str] = [] @@ -149,7 +149,7 @@ def validate_connector_config( if len(config.value.errors) > 0: for error in config.value.errors: errors.append( - f"Found error for field {config.value.name}: {error}" + f"Found error for field {config.value.name}: {error}", ) return errors raise KafkaConnectError(response) @@ -163,7 +163,7 @@ def delete_connector(self, connector_name: str) -> None: :raises ConnectorNotFoundException: Connector not found """ response = httpx.delete( - url=f"{self._host}/connectors/{connector_name}", headers=HEADERS + url=f"{self._host}/connectors/{connector_name}", headers=HEADERS, ) if response.status_code == httpx.codes.NO_CONTENT: log.info(f"Connector {connector_name} deleted.") @@ -173,7 +173,7 @@ def delete_connector(self, connector_name: str) -> None: raise ConnectorNotFoundException() elif response.status_code == httpx.codes.CONFLICT: log.warning( - "Rebalancing in progress while deleting a connector... Retrying..." + "Rebalancing in progress while deleting a connector... Retrying...", ) sleep(1) self.delete_connector(connector_name) diff --git a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py index 766c76b28..fb81aa411 100644 --- a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py +++ b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py @@ -34,7 +34,7 @@ def __init__( self._timeout = timeout def create_connector( - self, connector_config: KafkaConnectorConfig, *, dry_run: bool + self, connector_config: KafkaConnectorConfig, *, dry_run: bool, ) -> None: """Create a connector. @@ -54,7 +54,7 @@ def create_connector( timeout( lambda: self._connect_wrapper.update_connector_config( - connector_config + connector_config, ), secs=self._timeout, ) @@ -86,11 +86,11 @@ def destroy_connector(self, connector_name: str, *, dry_run: bool) -> None: ) except ConnectorNotFoundException: log.warning( - f"Connector Destruction: the connector {connector_name} does not exist. Skipping." + f"Connector Destruction: the connector {connector_name} does not exist. Skipping.", ) def __dry_run_connector_creation( - self, connector_config: KafkaConnectorConfig + self, connector_config: KafkaConnectorConfig, ) -> None: connector_name = connector_config.name try: @@ -106,7 +106,7 @@ def __dry_run_connector_creation( except ConnectorNotFoundException: diff = render_diff({}, connector_config.dict()) log.info( - f"Connector Creation: connector {connector_name} does not exist. Creating connector with config:\n{diff}" + f"Connector Creation: connector {connector_name} does not exist. Creating connector with config:\n{diff}", ) log.debug("POST /connectors HTTP/1.1") log.debug(f"HOST: {self._connect_wrapper.host}") @@ -115,11 +115,11 @@ def __dry_run_connector_creation( if len(errors) > 0: formatted_errors = "\n".join(errors) raise ConnectorStateException( - f"Connector Creation: validating the connector config for connector {connector_name} resulted in the following errors: {formatted_errors}" + f"Connector Creation: validating the connector config for connector {connector_name} resulted in the following errors: {formatted_errors}", ) else: log.info( - f"Connector Creation: connector config for {connector_name} is valid!" + f"Connector Creation: connector config for {connector_name} is valid!", ) def __dry_run_connector_deletion(self, connector_name: str) -> None: @@ -127,14 +127,14 @@ def __dry_run_connector_deletion(self, connector_name: str) -> None: self._connect_wrapper.get_connector(connector_name) log.info( magentaify( - f"Connector Destruction: connector {connector_name} already exists. Deleting connector." - ) + f"Connector Destruction: connector {connector_name} already exists. Deleting connector.", + ), ) log.debug(f"DELETE /connectors/{connector_name} HTTP/1.1") log.debug(f"HOST: {self._connect_wrapper.host}") except ConnectorNotFoundException: log.warning( - f"Connector Destruction: connector {connector_name} does not exist and cannot be deleted. Skipping." + f"Connector Destruction: connector {connector_name} does not exist and cannot be deleted. Skipping.", ) @classmethod diff --git a/kpops/component_handlers/kafka_connect/timeout.py b/kpops/component_handlers/kafka_connect/timeout.py index 6f0fd788d..cc9b0d127 100644 --- a/kpops/component_handlers/kafka_connect/timeout.py +++ b/kpops/component_handlers/kafka_connect/timeout.py @@ -30,5 +30,5 @@ async def main_supervisor(func: Callable[..., T], secs: int) -> T: return complete except TimeoutError: log.error( - f"Kafka Connect operation {func.__name__} timed out after {secs} seconds. To increase the duration, set the `timeout` option in config.yaml." + f"Kafka Connect operation {func.__name__} timed out after {secs} seconds. To increase the duration, set the `timeout` option in config.yaml.", ) diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index e5e13e70e..8c2065f7a 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -30,19 +30,19 @@ def schema_provider(self) -> SchemaProvider: try: if not self.components_module: raise ValueError( - f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists." + f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists.", ) schema_provider_class = find_class(self.components_module, SchemaProvider) return schema_provider_class() # pyright: ignore[reportGeneralTypeIssues] except ClassNotFoundError as e: raise ValueError( f"No schema provider found in components module {self.components_module}. " - f"Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}." + f"Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}.", ) from e @classmethod def load_schema_handler( - cls, components_module: str | None, config: PipelineConfig + cls, components_module: str | None, config: PipelineConfig, ) -> SchemaHandler | None: if not config.schema_registry_url: return None @@ -58,14 +58,14 @@ def submit_schemas(self, to_section: ToSection, dry_run: bool = True) -> None: key_schema_class = config.key_schema if value_schema_class is not None: schema = self.schema_provider.provide_schema( - value_schema_class, to_section.models + value_schema_class, to_section.models, ) self.__submit_value_schema( - schema, value_schema_class, dry_run, topic_name + schema, value_schema_class, dry_run, topic_name, ) if key_schema_class is not None: schema = self.schema_provider.provide_schema( - key_schema_class, to_section.models + key_schema_class, to_section.models, ) self.__submit_key_schema(schema, key_schema_class, dry_run, topic_name) @@ -119,25 +119,25 @@ def __submit_schema( else: log.info( greenify( - f"Schema Submission: The subject {subject} will be submitted." - ) + f"Schema Submission: The subject {subject} will be submitted.", + ), ) else: self.schema_registry_client.register(subject=subject, schema=schema) log.info( - f"Schema Submission: schema submitted for {subject} with model {schema_class}." + f"Schema Submission: schema submitted for {subject} with model {schema_class}.", ) def __subject_exists(self, subject: str) -> bool: return len(self.schema_registry_client.get_versions(subject)) > 0 def __check_compatibility( - self, schema: Schema, schema_class: str, subject: str + self, schema: Schema, schema_class: str, subject: str, ) -> None: registered_version = self.schema_registry_client.check_version(subject, schema) if registered_version is None: if not self.schema_registry_client.test_compatibility( - subject=subject, schema=schema + subject=subject, schema=schema, ): schema_str = ( schema.flat_schema @@ -145,15 +145,15 @@ def __check_compatibility( else str(schema) ) raise Exception( - f"Schema is not compatible for {subject} and model {schema_class}. \n {json.dumps(schema_str, indent=4)}" + f"Schema is not compatible for {subject} and model {schema_class}. \n {json.dumps(schema_str, indent=4)}", ) else: log.debug( - f"Schema Submission: schema was already submitted for the subject {subject} as version {registered_version.schema}. Therefore, the specified schema must be compatible." + f"Schema Submission: schema was already submitted for the subject {subject} as version {registered_version.schema}. Therefore, the specified schema must be compatible.", ) log.info( - f"Schema Submission: compatible schema for {subject} with model {schema_class}." + f"Schema Submission: compatible schema for {subject} with model {schema_class}.", ) def __delete_subject(self, subject: str, dry_run: bool) -> None: @@ -162,5 +162,5 @@ def __delete_subject(self, subject: str, dry_run: bool) -> None: else: version_list = self.schema_registry_client.delete_subject(subject) log.info( - f"Schema Deletion: deleted {len(version_list)} versions for subject {subject}." + f"Schema Deletion: deleted {len(version_list)} versions for subject {subject}.", ) diff --git a/kpops/component_handlers/schema_handler/schema_provider.py b/kpops/component_handlers/schema_handler/schema_provider.py index 2b93bf943..ba7990ce1 100644 --- a/kpops/component_handlers/schema_handler/schema_provider.py +++ b/kpops/component_handlers/schema_handler/schema_provider.py @@ -13,6 +13,6 @@ class SchemaProvider(ABC): @abstractmethod def provide_schema( - self, schema_class: str, models: dict[ModelName, ModelVersion] + self, schema_class: str, models: dict[ModelName, ModelVersion], ) -> Schema: ... diff --git a/kpops/component_handlers/topic/handler.py b/kpops/component_handlers/topic/handler.py index afc4f6e77..b436b20e3 100644 --- a/kpops/component_handlers/topic/handler.py +++ b/kpops/component_handlers/topic/handler.py @@ -35,10 +35,10 @@ def create_topics(self, to_section: ToSection, dry_run: bool) -> None: try: self.proxy_wrapper.get_topic(topic_name=topic_name) topic_config_in_cluster = self.proxy_wrapper.get_topic_config( - topic_name=topic_name + topic_name=topic_name, ) differences = self.__get_topic_config_diff( - topic_config_in_cluster, topic_config.configs + topic_config_in_cluster, topic_config.configs, ) if differences: @@ -46,11 +46,11 @@ def create_topics(self, to_section: ToSection, dry_run: bool) -> None: for difference in differences: if difference.diff_type is DiffType.REMOVE: json_body.append( - {"name": difference.key, "operation": "DELETE"} + {"name": difference.key, "operation": "DELETE"}, ) elif config_value := difference.change.new_value: json_body.append( - {"name": difference.key, "value": config_value} + {"name": difference.key, "value": config_value}, ) self.proxy_wrapper.batch_alter_topic_config( topic_name=topic_name, @@ -59,7 +59,7 @@ def create_topics(self, to_section: ToSection, dry_run: bool) -> None: else: log.info( - f"Topic Creation: config of topic {topic_name} didn't change. Skipping update." + f"Topic Creation: config of topic {topic_name} didn't change. Skipping update.", ) except TopicNotFoundException: self.proxy_wrapper.create_topic(topic_spec=topic_spec) @@ -74,15 +74,15 @@ def delete_topics(self, to_section: ToSection, dry_run: bool) -> None: self.proxy_wrapper.delete_topic(topic_name=topic_name) except TopicNotFoundException: log.warning( - f"Topic Deletion: topic {topic_name} does not exist in the cluster and cannot be deleted. Skipping." + f"Topic Deletion: topic {topic_name} does not exist in the cluster and cannot be deleted. Skipping.", ) @staticmethod def __get_topic_config_diff( - cluster_config: TopicConfigResponse, current_config: dict + cluster_config: TopicConfigResponse, current_config: dict, ) -> list[Diff]: comparable_in_cluster_config_dict, _ = parse_rest_proxy_topic_config( - cluster_config + cluster_config, ) return list(Diff.from_dicts(comparable_in_cluster_config_dict, current_config)) @@ -97,10 +97,10 @@ def __dry_run_topic_creation( topic_name = topic_in_cluster.topic_name if topic_config: topic_config_in_cluster = self.proxy_wrapper.get_topic_config( - topic_name=topic_name + topic_name=topic_name, ) in_cluster_config, new_config = parse_and_compare_topic_configs( - topic_config_in_cluster, topic_config.configs + topic_config_in_cluster, topic_config.configs, ) if diff := render_diff(in_cluster_config, new_config): log.info(f"Config changes for topic {topic_name}:") @@ -120,13 +120,13 @@ def __dry_run_topic_creation( self.__check_partition_count(topic_in_cluster, topic_spec, effective_config) self.__check_replication_factor( - topic_in_cluster, topic_spec, effective_config + topic_in_cluster, topic_spec, effective_config, ) except TopicNotFoundException: log.info( greenify( - f"Topic Creation: {topic_name} does not exist in the cluster. Creating topic." - ) + f"Topic Creation: {topic_name} does not exist in the cluster. Creating topic.", + ), ) log.debug(f"POST /clusters/{self.proxy_wrapper.cluster_id}/topics HTTP/1.1") log.debug(f"Host: {self.proxy_wrapper.host}") @@ -145,11 +145,11 @@ def __check_partition_count( topic_spec.partitions_count or int(broker_config["num.partitions"]) ): log.debug( - f"Topic Creation: partition count of topic {topic_name} did not change. Current partitions count {partition_count}. Updating configs." + f"Topic Creation: partition count of topic {topic_name} did not change. Current partitions count {partition_count}. Updating configs.", ) else: raise TopicTransactionError( - f"Topic Creation: partition count of topic {topic_name} changed! Partitions count of topic {topic_name} is {partition_count}. The given partitions count {topic_spec.partitions_count}." + f"Topic Creation: partition count of topic {topic_name} changed! Partitions count of topic {topic_name} is {partition_count}. The given partitions count {topic_spec.partitions_count}.", ) @staticmethod @@ -165,11 +165,11 @@ def __check_replication_factor( or int(broker_config["default.replication.factor"]) ): log.debug( - f"Topic Creation: replication factor of topic {topic_name} did not change. Current replication factor {replication_factor}. Updating configs." + f"Topic Creation: replication factor of topic {topic_name} did not change. Current replication factor {replication_factor}. Updating configs.", ) else: raise TopicTransactionError( - f"Topic Creation: replication factor of topic {topic_name} changed! Replication factor of topic {topic_name} is {replication_factor}. The given replication count {topic_spec.replication_factor}." + f"Topic Creation: replication factor of topic {topic_name} changed! Replication factor of topic {topic_name} is {replication_factor}. The given replication count {topic_spec.replication_factor}.", ) def __dry_run_topic_deletion(self, topic_name: str) -> None: @@ -177,15 +177,15 @@ def __dry_run_topic_deletion(self, topic_name: str) -> None: topic_in_cluster = self.proxy_wrapper.get_topic(topic_name=topic_name) log.info( magentaify( - f"Topic Deletion: topic {topic_in_cluster.topic_name} exists in the cluster. Deleting topic." - ) + f"Topic Deletion: topic {topic_in_cluster.topic_name} exists in the cluster. Deleting topic.", + ), ) log.debug( - f"DELETE /clusters/{self.proxy_wrapper.cluster_id}/topics HTTP/1.1" + f"DELETE /clusters/{self.proxy_wrapper.cluster_id}/topics HTTP/1.1", ) except TopicNotFoundException: log.warning( - f"Topic Deletion: topic {topic_name} does not exist in the cluster and cannot be deleted. Skipping." + f"Topic Deletion: topic {topic_name} does not exist in the cluster and cannot be deleted. Skipping.", ) log.debug(f"Host: {self.proxy_wrapper.host}") log.debug(HEADERS) diff --git a/kpops/component_handlers/topic/proxy_wrapper.py b/kpops/component_handlers/topic/proxy_wrapper.py index 88fc6e310..1a36e4b50 100644 --- a/kpops/component_handlers/topic/proxy_wrapper.py +++ b/kpops/component_handlers/topic/proxy_wrapper.py @@ -26,7 +26,7 @@ class ProxyWrapper: def __init__(self, pipeline_config: PipelineConfig) -> None: if not pipeline_config.kafka_rest_host: raise ValueError( - "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." + "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST.", ) self._host = pipeline_config.kafka_rest_host diff --git a/kpops/component_handlers/topic/utils.py b/kpops/component_handlers/topic/utils.py index 70f71d0b3..0b5576d1f 100644 --- a/kpops/component_handlers/topic/utils.py +++ b/kpops/component_handlers/topic/utils.py @@ -6,17 +6,17 @@ def parse_and_compare_topic_configs( - topic_config_in_cluster: TopicConfigResponse, topic_config: dict + topic_config_in_cluster: TopicConfigResponse, topic_config: dict, ) -> tuple[dict, dict]: comparable_in_cluster_config_dict, default_configs = parse_rest_proxy_topic_config( - topic_config_in_cluster + topic_config_in_cluster, ) cluster_defaults_overwrite = set(topic_config.keys()) - set( - comparable_in_cluster_config_dict.keys() + comparable_in_cluster_config_dict.keys(), ) config_overwrites = set(comparable_in_cluster_config_dict.keys()) - set( - topic_config.keys() + topic_config.keys(), ) populate_default_configs( cluster_defaults_overwrite, diff --git a/kpops/component_handlers/utils/exception.py b/kpops/component_handlers/utils/exception.py index fe906190f..00bdca315 100644 --- a/kpops/component_handlers/utils/exception.py +++ b/kpops/component_handlers/utils/exception.py @@ -11,7 +11,7 @@ def __init__(self, response: httpx.Response) -> None: self.error_msg = "Something went wrong!" try: log.error( - f"The request responded with the code {self.error_code}. Error body: {response.json()}" + f"The request responded with the code {self.error_code}. Error body: {response.json()}", ) response.raise_for_status() except httpx.HTTPError as e: diff --git a/kpops/components/base_components/base_defaults_component.py b/kpops/components/base_components/base_defaults_component.py index a02cc1417..e789182ff 100644 --- a/kpops/components/base_components/base_defaults_component.py +++ b/kpops/components/base_components/base_defaults_component.py @@ -93,17 +93,17 @@ def extend_with_defaults(self, **kwargs) -> dict: config: PipelineConfig = kwargs["config"] log.debug( typer.style( - "Enriching component of type ", fg=typer.colors.GREEN, bold=False + "Enriching component of type ", fg=typer.colors.GREEN, bold=False, ) + typer.style( - kwargs.get("type"), fg=typer.colors.GREEN, bold=True, underline=True - ) + kwargs.get("type"), fg=typer.colors.GREEN, bold=True, underline=True, + ), ) main_default_file_path, environment_default_file_path = get_defaults_file_paths( - config + config, ) defaults = load_defaults( - self.__class__, main_default_file_path, environment_default_file_path + self.__class__, main_default_file_path, environment_default_file_path, ) kwargs = update_nested(kwargs, defaults) return kwargs @@ -166,13 +166,13 @@ def defaults_from_yaml(path: Path, key: str) -> dict: content = load_yaml_file(path, substitution=ENV) if not isinstance(content, dict): raise TypeError( - "Default files should be structured as map ([app type] -> [default config]" + "Default files should be structured as map ([app type] -> [default config]", ) value = content.get(key) if value is None: return {} log.debug( - f"\tFound defaults for component type {typer.style(key, bold=True, fg=typer.colors.MAGENTA)} in file: {path}" + f"\tFound defaults for component type {typer.style(key, bold=True, fg=typer.colors.MAGENTA)} in file: {path}", ) return value @@ -189,11 +189,11 @@ def get_defaults_file_paths(config: PipelineConfig) -> tuple[Path, Path]: """ defaults_dir = Path(config.defaults_path).resolve() main_default_file_path = defaults_dir / Path( - config.defaults_filename_prefix + config.defaults_filename_prefix, ).with_suffix(".yaml") environment_default_file_path = defaults_dir / Path( - f"{config.defaults_filename_prefix}_{config.environment}" + f"{config.defaults_filename_prefix}_{config.environment}", ).with_suffix(".yaml") return main_default_file_path, environment_default_file_path diff --git a/kpops/components/base_components/kafka_app.py b/kpops/components/base_components/kafka_app.py index c6919bf3e..c217a3c92 100644 --- a/kpops/components/base_components/kafka_app.py +++ b/kpops/components/base_components/kafka_app.py @@ -30,7 +30,7 @@ class KafkaStreamsConfig(BaseModel): brokers: str = Field(default=..., description=describe_attr("brokers", __doc__)) schema_registry_url: str | None = Field( - default=None, description=describe_attr("schema_registry_url", __doc__) + default=None, description=describe_attr("schema_registry_url", __doc__), ) class Config(CamelCaseConfig, DescConfig): @@ -45,10 +45,10 @@ class KafkaAppConfig(KubernetesAppConfig): """ streams: KafkaStreamsConfig = Field( - default=..., description=describe_attr("streams", __doc__) + default=..., description=describe_attr("streams", __doc__), ) name_override: str | None = Field( - default=None, description=describe_attr("name_override", __doc__) + default=None, description=describe_attr("name_override", __doc__), ) @@ -89,12 +89,12 @@ def clean_up_helm_chart(self) -> str: def deploy(self, dry_run: bool) -> None: if self.to: self.handlers.topic_handler.create_topics( - to_section=self.to, dry_run=dry_run + to_section=self.to, dry_run=dry_run, ) if self.handlers.schema_handler: self.handlers.schema_handler.submit_schemas( - to_section=self.to, dry_run=dry_run + to_section=self.to, dry_run=dry_run, ) super().deploy(dry_run) @@ -113,7 +113,7 @@ def _run_clean_up_job( """ suffix = "-clean" clean_up_release_name = trim_release_name( - self.helm_release_name + suffix, suffix + self.helm_release_name + suffix, suffix, ) log.info(f"Uninstall old cleanup job for {clean_up_release_name}") @@ -122,7 +122,7 @@ def _run_clean_up_job( log.info(f"Init cleanup job for {clean_up_release_name}") stdout = self.__install_clean_up_job( - clean_up_release_name, suffix, values, dry_run + clean_up_release_name, suffix, values, dry_run, ) if dry_run: diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index b63aaebda..2d6c8c8bc 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -65,7 +65,7 @@ class KafkaConnector(PipelineComponent, ABC): description=describe_attr("repo_config", __doc__), ) version: str | None = Field( - default="1.0.4", description=describe_attr("version", __doc__) + default="1.0.4", description=describe_attr("version", __doc__), ) resetter_values: dict = Field( default_factory=dict, @@ -138,12 +138,12 @@ def template_flags(self) -> HelmTemplateFlags: def deploy(self, dry_run: bool) -> None: if self.to: self.handlers.topic_handler.create_topics( - to_section=self.to, dry_run=dry_run + to_section=self.to, dry_run=dry_run, ) if self.handlers.schema_handler: self.handlers.schema_handler.submit_schemas( - to_section=self.to, dry_run=dry_run + to_section=self.to, dry_run=dry_run, ) self.handlers.connector_handler.create_connector(self.app, dry_run=dry_run) @@ -151,7 +151,7 @@ def deploy(self, dry_run: bool) -> None: @override def destroy(self, dry_run: bool) -> None: self.handlers.connector_handler.destroy_connector( - self.full_name, dry_run=dry_run + self.full_name, dry_run=dry_run, ) @override @@ -159,7 +159,7 @@ def clean(self, dry_run: bool) -> None: if self.to: if self.handlers.schema_handler: self.handlers.schema_handler.delete_schemas( - to_section=self.to, dry_run=dry_run + to_section=self.to, dry_run=dry_run, ) self.handlers.topic_handler.delete_topics(self.to, dry_run=dry_run) @@ -181,22 +181,22 @@ def _run_connect_resetter( """ log.info( magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}" - ) + f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}", + ), ) self.__uninstall_connect_resetter(self._resetter_release_name, dry_run) log.info( magentaify( - f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}" - ) + f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}", + ), ) stdout = self.__install_connect_resetter(dry_run, **kwargs) if dry_run: self.dry_run_handler.print_helm_diff( - stdout, self._resetter_release_name, log + stdout, self._resetter_release_name, log, ) if not retain_clean_jobs: @@ -359,7 +359,7 @@ def clean(self, dry_run: bool) -> None: self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=True) def __run_kafka_connect_resetter( - self, dry_run: bool, delete_consumer_group: bool + self, dry_run: bool, delete_consumer_group: bool, ) -> None: """Run the connector resetter. diff --git a/kpops/components/base_components/kubernetes_app.py b/kpops/components/base_components/kubernetes_app.py index 57c51a187..b2676da7c 100644 --- a/kpops/components/base_components/kubernetes_app.py +++ b/kpops/components/base_components/kubernetes_app.py @@ -25,7 +25,7 @@ log = logging.getLogger("KubernetesAppComponent") KUBERNETES_NAME_CHECK_PATTERN = re.compile( - r"^(?![0-9]+$)(?!.*-$)(?!-)[a-z0-9-.]{1,253}(? str: def helm_chart(self) -> str: """Return component's Helm chart.""" raise NotImplementedError( - f"Please implement the helm_chart property of the {self.__module__} module." + f"Please implement the helm_chart property of the {self.__module__} module.", ) @property @@ -171,7 +171,7 @@ def print_helm_diff(self, stdout: str) -> None: :param stdout: The output of a Helm command that installs or upgrades the release """ current_release = list( - self.helm.get_manifest(self.helm_release_name, self.namespace) + self.helm.get_manifest(self.helm_release_name, self.namespace), ) if current_release: log.info(f"Helm release {self.helm_release_name} already exists") diff --git a/kpops/components/base_components/models/from_section.py b/kpops/components/base_components/models/from_section.py index fdef7782f..aea159eb2 100644 --- a/kpops/components/base_components/models/from_section.py +++ b/kpops/components/base_components/models/from_section.py @@ -27,7 +27,7 @@ class FromTopic(BaseModel): """ type: InputTopicTypes | None = Field( - default=None, description=describe_attr("type", __doc__) + default=None, description=describe_attr("type", __doc__), ) role: str | None = Field(default=None, description=describe_attr("role", __doc__)) diff --git a/kpops/components/base_components/models/to_section.py b/kpops/components/base_components/models/to_section.py index c10f27c23..00393ee4e 100644 --- a/kpops/components/base_components/models/to_section.py +++ b/kpops/components/base_components/models/to_section.py @@ -31,7 +31,7 @@ class TopicConfig(BaseModel): """ type: OutputTopicTypes | None = Field( - default=None, title="Topic type", description=describe_attr("type", __doc__) + default=None, title="Topic type", description=describe_attr("type", __doc__), ) key_schema: str | None = Field( default=None, @@ -54,7 +54,7 @@ class TopicConfig(BaseModel): description=describe_attr("replication_factor", __doc__), ) configs: dict[str, str | int] = Field( - default={}, description=describe_attr("configs", __doc__) + default={}, description=describe_attr("configs", __doc__), ) role: str | None = Field(default=None, description=describe_attr("role", __doc__)) @@ -79,10 +79,10 @@ class ToSection(BaseModel): """ topics: dict[TopicName, TopicConfig] = Field( - default={}, description=describe_attr("topics", __doc__) + default={}, description=describe_attr("topics", __doc__), ) models: dict[ModelName, ModelVersion] = Field( - default={}, description=describe_attr("models", __doc__) + default={}, description=describe_attr("models", __doc__), ) class Config(DescConfig): diff --git a/kpops/components/streams_bootstrap/producer/model.py b/kpops/components/streams_bootstrap/producer/model.py index 8af1a68c6..ad948bfcc 100644 --- a/kpops/components/streams_bootstrap/producer/model.py +++ b/kpops/components/streams_bootstrap/producer/model.py @@ -15,10 +15,10 @@ class ProducerStreamsConfig(KafkaStreamsConfig): """ extra_output_topics: dict[str, str] = Field( - default={}, description=describe_attr("extra_output_topics", __doc__) + default={}, description=describe_attr("extra_output_topics", __doc__), ) output_topic: str | None = Field( - default=None, description=describe_attr("output_topic", __doc__) + default=None, description=describe_attr("output_topic", __doc__), ) @@ -29,7 +29,7 @@ class ProducerValues(KafkaAppConfig): """ streams: ProducerStreamsConfig = Field( - default=..., description=describe_attr("streams", __doc__) + default=..., description=describe_attr("streams", __doc__), ) class Config(BaseConfig): diff --git a/kpops/components/streams_bootstrap/streams/model.py b/kpops/components/streams_bootstrap/streams/model.py index ece5c042b..28ec5059e 100644 --- a/kpops/components/streams_bootstrap/streams/model.py +++ b/kpops/components/streams_bootstrap/streams/model.py @@ -27,28 +27,28 @@ class StreamsConfig(KafkaStreamsConfig): """ input_topics: list[str] = Field( - default=[], description=describe_attr("input_topics", __doc__) + default=[], description=describe_attr("input_topics", __doc__), ) input_pattern: str | None = Field( - default=None, description=describe_attr("input_pattern", __doc__) + default=None, description=describe_attr("input_pattern", __doc__), ) extra_input_topics: dict[str, list[str]] = Field( - default={}, description=describe_attr("extra_input_topics", __doc__) + default={}, description=describe_attr("extra_input_topics", __doc__), ) extra_input_patterns: dict[str, str] = Field( - default={}, description=describe_attr("extra_input_patterns", __doc__) + default={}, description=describe_attr("extra_input_patterns", __doc__), ) extra_output_topics: dict[str, str] = Field( - default={}, description=describe_attr("extra_output_topics", __doc__) + default={}, description=describe_attr("extra_output_topics", __doc__), ) output_topic: str | None = Field( - default=None, description=describe_attr("output_topic", __doc__) + default=None, description=describe_attr("output_topic", __doc__), ) error_topic: str | None = Field( - default=None, description=describe_attr("error_topic", __doc__) + default=None, description=describe_attr("error_topic", __doc__), ) config: dict[str, str] = Field( - default={}, description=describe_attr("config", __doc__) + default={}, description=describe_attr("config", __doc__), ) def add_input_topics(self, topics: list[str]) -> None: @@ -69,7 +69,7 @@ def add_extra_input_topics(self, role: str, topics: list[str]) -> None: :param role: Topic role """ self.extra_input_topics[role] = deduplicate( - self.extra_input_topics.get(role, []) + topics + self.extra_input_topics.get(role, []) + topics, ) @override diff --git a/kpops/pipeline_generator/pipeline.py b/kpops/pipeline_generator/pipeline.py index 7b3515e1c..0ed15e9b1 100644 --- a/kpops/pipeline_generator/pipeline.py +++ b/kpops/pipeline_generator/pipeline.py @@ -64,13 +64,13 @@ def validate_unique_names(self) -> None: duplicates = [name for name, count in Counter(step_names).items() if count > 1] if duplicates: raise ValidationError( - f"step names should be unique. duplicate step names: {', '.join(duplicates)}" + f"step names should be unique. duplicate step names: {', '.join(duplicates)}", ) @staticmethod def _populate_component_name(component: PipelineComponent) -> None: # TODO: remove with suppress( - AttributeError # Some components like Kafka Connect do not have a name_override attribute + AttributeError, # Some components like Kafka Connect do not have a name_override attribute ): if (app := getattr(component, "app")) and app.name_override is None: app.name_override = component.full_name @@ -88,7 +88,7 @@ def create_env_components_index( for component in environment_components: if "type" not in component or "name" not in component: raise ValueError( - "To override components per environment, every component should at least have a type and a name." + "To override components per environment, every component should at least have a type and a name.", ) index[component["name"]] = component return index @@ -138,14 +138,14 @@ def load_from_yaml( main_content = load_yaml_file(path, substitution=ENV) if not isinstance(main_content, list): raise TypeError( - f"The pipeline definition {path} should contain a list of components" + f"The pipeline definition {path} should contain a list of components", ) env_content = [] if (env_file := Pipeline.pipeline_filename_environment(path, config)).exists(): env_content = load_yaml_file(env_file, substitution=ENV) if not isinstance(env_content, list): raise TypeError( - f"The pipeline definition {env_file} should contain a list of components" + f"The pipeline definition {env_file} should contain a list of components", ) pipeline = cls(main_content, env_content, registry, config, handlers) @@ -165,20 +165,20 @@ def parse_components(self, component_list: list[dict]) -> None: component_type: str = component_data["type"] except KeyError as ke: raise ValueError( - "Every component must have a type defined, this component does not have one." + "Every component must have a type defined, this component does not have one.", ) from ke component_class = self.registry[component_type] self.apply_component(component_class, component_data) except Exception as ex: # noqa: BLE001 if "name" in component_data: raise ParsingException( - f"Error enriching {component_data['type']} component {component_data['name']}" + f"Error enriching {component_data['type']} component {component_data['name']}", ) from ex else: raise ParsingException() from ex def apply_component( - self, component_class: type[PipelineComponent], component_data: dict + self, component_class: type[PipelineComponent], component_data: dict, ) -> None: """Instantiate, enrich and inflate pipeline component. @@ -205,14 +205,14 @@ def apply_component( from_topic, ) in enriched_component.from_.components.items(): original_from_component = self.components.find( - original_from_component_name + original_from_component_name, ) inflated_from_component = original_from_component.inflate()[-1] resolved_from_component = self.components.find( - inflated_from_component.name + inflated_from_component.name, ) enriched_component.weave_from_topics( - resolved_from_component.to, from_topic + resolved_from_component.to, from_topic, ) elif self.components: # read from previous component @@ -260,7 +260,7 @@ def print_yaml(self, substitution: dict | None = None) -> None: theme="ansi_dark", ) Console( - width=1000 # HACK: overwrite console width to avoid truncating output + width=1000, # HACK: overwrite console width to avoid truncating output ).print(syntax) def __iter__(self) -> Iterator[PipelineComponent]: @@ -269,8 +269,8 @@ def __iter__(self) -> Iterator[PipelineComponent]: def __str__(self) -> str: return yaml.dump( json.loads( # HACK: serialize types on Pydantic model export, which are not serialized by .dict(); e.g. pathlib.Path - self.components.json(exclude_none=True, by_alias=True) - ) + self.components.json(exclude_none=True, by_alias=True), + ), ) def __len__(self) -> int: @@ -296,14 +296,14 @@ def substitute_in_component(self, component_as_dict: dict) -> dict: substitution_hardcoded, ) substitution = generate_substitution( - json.loads(config.json()), existing_substitution=component_substitution + json.loads(config.json()), existing_substitution=component_substitution, ) return json.loads( substitute_nested( json.dumps(component_as_dict), **update_nested_pair(substitution, ENV), - ) + ), ) def validate(self) -> None: diff --git a/kpops/utils/dict_differ.py b/kpops/utils/dict_differ.py index dbc53f67c..da47bd620 100644 --- a/kpops/utils/dict_differ.py +++ b/kpops/utils/dict_differ.py @@ -51,7 +51,7 @@ class Diff(Generic[T]): @staticmethod def from_dicts( - d1: dict, d2: dict, ignore: set[str] | None = None + d1: dict, d2: dict, ignore: set[str] | None = None, ) -> Iterator[Diff]: for diff_type, keys, changes in diff(d1, d2, ignore=ignore): if not isinstance(changes, list): @@ -86,8 +86,8 @@ def render_diff(d1: dict, d2: dict, ignore: set[str] | None = None) -> str | Non differ.compare( to_yaml(d1) if d1 else "", to_yaml(d2_filtered) if d2_filtered else "", - ) - ) + ), + ), ) diff --git a/kpops/utils/dict_ops.py b/kpops/utils/dict_ops.py index 94c9003f4..0f4643043 100644 --- a/kpops/utils/dict_ops.py +++ b/kpops/utils/dict_ops.py @@ -47,7 +47,7 @@ def update_nested(*argv: dict) -> dict: def flatten_mapping( - nested_mapping: Mapping[str, Any], prefix: str | None = None, separator: str = "_" + nested_mapping: Mapping[str, Any], prefix: str | None = None, separator: str = "_", ) -> dict[str, Any]: """Flattens a Mapping. diff --git a/kpops/utils/environment.py b/kpops/utils/environment.py index 0ed7ae920..b1b2271b4 100644 --- a/kpops/utils/environment.py +++ b/kpops/utils/environment.py @@ -13,7 +13,7 @@ def __init__(self, mapping=None, /, **kwargs) -> None: mapping = {} if kwargs: mapping.update( - {transformation(key): value for key, value in kwargs.items()} + {transformation(key): value for key, value in kwargs.items()}, ) super().__init__(mapping) diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index f202d0706..51c64ce2c 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -37,7 +37,7 @@ def field_schema(field: ModelField, **kwargs: Any) -> Any: def _is_valid_component( - defined_component_types: set[str], component: type[PipelineComponent] + defined_component_types: set[str], component: type[PipelineComponent], ) -> bool: """Check whether a PipelineComponent subclass has a valid definition for the schema generation. @@ -56,7 +56,7 @@ def _is_valid_component( def _add_components( - components_module: str, components: tuple[type[PipelineComponent]] | None = None + components_module: str, components: tuple[type[PipelineComponent]] | None = None, ) -> tuple[type[PipelineComponent]]: """Add components to a components tuple. @@ -82,7 +82,7 @@ def _add_components( def gen_pipeline_schema( - components_module: str | None = None, include_stock_components: bool = True + components_module: str | None = None, include_stock_components: bool = True, ) -> None: """Generate a json schema from the models of pipeline components. @@ -123,7 +123,7 @@ def gen_pipeline_schema( ) AnnotatedPipelineComponents = Annotated[ - PipelineComponents, Field(discriminator="type") + PipelineComponents, Field(discriminator="type"), ] schema = schema_json_of( @@ -139,6 +139,6 @@ def gen_pipeline_schema( def gen_config_schema() -> None: """Generate a json schema from the model of pipeline config.""" schema = schema_json_of( - PipelineConfig, title="KPOps config schema", indent=4, sort_keys=True + PipelineConfig, title="KPOps config schema", indent=4, sort_keys=True, ) print(schema) diff --git a/kpops/utils/yaml_loading.py b/kpops/utils/yaml_loading.py index cb9536200..9dc53c1ab 100644 --- a/kpops/utils/yaml_loading.py +++ b/kpops/utils/yaml_loading.py @@ -9,7 +9,7 @@ def generate_hashkey( - file_path: Path, substitution: Mapping[str, Any] | None = None + file_path: Path, substitution: Mapping[str, Any] | None = None, ) -> tuple: if substitution is None: substitution = {} @@ -18,7 +18,7 @@ def generate_hashkey( @cached(cache={}, key=generate_hashkey) def load_yaml_file( - file_path: Path, *, substitution: Mapping[str, Any] | None = None + file_path: Path, *, substitution: Mapping[str, Any] | None = None, ) -> dict | list[dict]: with open(file_path) as yaml_file: return yaml.load(substitute(yaml_file.read(), substitution), Loader=yaml.Loader) @@ -71,6 +71,6 @@ def substitute_nested(input: str, **kwargs) -> str: old_str, new_str = new_str, substitute(new_str, kwargs) if new_str != old_str: raise ValueError( - "An infinite loop condition detected. Check substitution variables." + "An infinite loop condition detected. Check substitution variables.", ) return old_str diff --git a/tests/cli/resources/module.py b/tests/cli/resources/module.py index 3956eedf2..4014d6ec4 100644 --- a/tests/cli/resources/module.py +++ b/tests/cli/resources/module.py @@ -9,6 +9,6 @@ class CustomSchemaProvider(SchemaProvider): def provide_schema( - self, schema_class: str, models: dict[ModelName, ModelVersion] + self, schema_class: str, models: dict[ModelName, ModelVersion], ) -> Schema: return AvroSchema() diff --git a/tests/cli/test_pipeline_steps.py b/tests/cli/test_pipeline_steps.py index a09d7b064..1d1cafbf1 100644 --- a/tests/cli/test_pipeline_steps.py +++ b/tests/cli/test_pipeline_steps.py @@ -45,7 +45,7 @@ def log_info(mocker: MockerFixture) -> MagicMock: def tests_filter_steps_to_apply(log_info: MagicMock, pipeline: Pipeline): filtered_steps = get_steps_to_apply( - pipeline, "example2,example3", FilterType.INCLUDE + pipeline, "example2,example3", FilterType.INCLUDE, ) assert len(filtered_steps) == 2 @@ -54,7 +54,7 @@ def tests_filter_steps_to_apply(log_info: MagicMock, pipeline: Pipeline): assert log_info.call_count == 1 log_info.assert_any_call( - "The following steps are included:\n['example2', 'example3']" + "The following steps are included:\n['example2', 'example3']", ) filtered_steps = get_steps_to_apply(pipeline, None, FilterType.INCLUDE) @@ -66,7 +66,7 @@ def tests_filter_steps_to_apply(log_info: MagicMock, pipeline: Pipeline): def tests_filter_steps_to_exclude(log_info: MagicMock, pipeline: Pipeline): filtered_steps = get_steps_to_apply( - pipeline, "example2,example3", FilterType.EXCLUDE + pipeline, "example2,example3", FilterType.EXCLUDE, ) assert len(filtered_steps) == 1 diff --git a/tests/cli/test_schema_generation.py b/tests/cli/test_schema_generation.py index 5223c4c21..fe66a5990 100644 --- a/tests/cli/test_schema_generation.py +++ b/tests/cli/test_schema_generation.py @@ -75,7 +75,7 @@ class SubPipelineComponentCorrectDocstr(SubPipelineComponent): """ example_attr: str = Field( - default=..., description=describe_attr("example_attr", __doc__) + default=..., description=describe_attr("example_attr", __doc__), ) @@ -83,7 +83,7 @@ class SubPipelineComponentCorrectDocstr(SubPipelineComponent): @pytest.mark.filterwarnings( - "ignore:handlers", "ignore:config", "ignore:enrich", "ignore:validate" + "ignore:handlers", "ignore:config", "ignore:enrich", "ignore:validate", ) class TestGenSchema: def test_gen_pipeline_schema_no_modules(self, caplog: pytest.LogCaptureFixture): @@ -101,7 +101,7 @@ def test_gen_pipeline_schema_no_modules(self, caplog: pytest.LogCaptureFixture): "root", logging.WARNING, "No components are provided, no schema is generated.", - ) + ), ] assert result.exit_code == 0 diff --git a/tests/component_handlers/helm_wrapper/test_dry_run_handler.py b/tests/component_handlers/helm_wrapper/test_dry_run_handler.py index 20c02f50d..df44f3e1e 100644 --- a/tests/component_handlers/helm_wrapper/test_dry_run_handler.py +++ b/tests/component_handlers/helm_wrapper/test_dry_run_handler.py @@ -15,13 +15,13 @@ class TestDryRunHandler: @pytest.fixture def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.helm_wrapper.dry_run_handler.Helm" + "kpops.component_handlers.helm_wrapper.dry_run_handler.Helm", ).return_value @pytest.fixture def helm_diff_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.helm_wrapper.dry_run_handler.HelmDiff" + "kpops.component_handlers.helm_wrapper.dry_run_handler.HelmDiff", ).return_value def test_should_print_helm_diff_when_release_is_new( @@ -42,7 +42,7 @@ def test_should_print_helm_diff_when_release_is_new( dry_run_handler.print_helm_diff("A test stdout", "a-release-name", log) helm_mock.get_manifest.assert_called_once_with( - "a-release-name", "test-namespace" + "a-release-name", "test-namespace", ) assert "Helm release a-release-name does not exist" in caplog.text mock_load_manifest.assert_called_once_with("A test stdout") @@ -55,7 +55,7 @@ def test_should_print_helm_diff_when_release_exists( caplog: LogCaptureFixture, ): helm_mock.get_manifest.return_value = iter( - [HelmTemplate("path.yaml", {"a": 1})] + [HelmTemplate("path.yaml", {"a": 1})], ) mock_load_manifest = mocker.patch( "kpops.component_handlers.helm_wrapper.dry_run_handler.Helm.load_manifest", @@ -67,7 +67,7 @@ def test_should_print_helm_diff_when_release_exists( dry_run_handler.print_helm_diff("A test stdout", "a-release-name", log) helm_mock.get_manifest.assert_called_once_with( - "a-release-name", "test-namespace" + "a-release-name", "test-namespace", ) assert "Helm release a-release-name already exists" in caplog.text mock_load_manifest.assert_called_once_with("A test stdout") diff --git a/tests/component_handlers/helm_wrapper/test_helm_diff.py b/tests/component_handlers/helm_wrapper/test_helm_diff.py index 15a58a023..fc423cf20 100644 --- a/tests/component_handlers/helm_wrapper/test_helm_diff.py +++ b/tests/component_handlers/helm_wrapper/test_helm_diff.py @@ -24,7 +24,7 @@ def test_diff(): HelmTemplate("a.yaml", {"a": 2}), HelmTemplate("c.yaml", {"c": 1}), ], - ) + ), ) == [ Change( old_value={"a": 1}, @@ -42,7 +42,7 @@ def test_diff(): # test no current release assert list( - helm_diff.calculate_changes((), [HelmTemplate("a.yaml", {"a": 1})]) + helm_diff.calculate_changes((), [HelmTemplate("a.yaml", {"a": 1})]), ) == [ Change( old_value={}, diff --git a/tests/component_handlers/helm_wrapper/test_helm_wrapper.py b/tests/component_handlers/helm_wrapper/test_helm_wrapper.py index de23dca8e..1afdb7885 100644 --- a/tests/component_handlers/helm_wrapper/test_helm_wrapper.py +++ b/tests/component_handlers/helm_wrapper/test_helm_wrapper.py @@ -44,7 +44,7 @@ def mock_get_version(self, mocker: MockerFixture) -> MagicMock: return mock_get_version def test_should_call_run_command_method_when_helm_install_with_defaults( - self, run_command: MagicMock, mock_get_version: MagicMock + self, run_command: MagicMock, mock_get_version: MagicMock, ): helm_wrapper = Helm(helm_config=HelmConfig()) @@ -74,7 +74,7 @@ def test_should_call_run_command_method_when_helm_install_with_defaults( ) def test_should_include_configured_tls_parameters_on_add_when_version_is_old( - self, run_command: MagicMock, mocker: MockerFixture + self, run_command: MagicMock, mocker: MockerFixture, ): mock_get_version = mocker.patch.object(Helm, "get_version") mock_get_version.return_value = Version(major=3, minor=6, patch=0) @@ -104,7 +104,7 @@ def test_should_include_configured_tls_parameters_on_add_when_version_is_old( ] def test_should_include_configured_tls_parameters_on_add_when_version_is_new( - self, run_command: MagicMock, mock_get_version: MagicMock + self, run_command: MagicMock, mock_get_version: MagicMock, ): helm = Helm(HelmConfig()) @@ -132,7 +132,7 @@ def test_should_include_configured_tls_parameters_on_add_when_version_is_new( ] def test_should_include_configured_tls_parameters_on_update( - self, run_command: MagicMock, mock_get_version: MagicMock + self, run_command: MagicMock, mock_get_version: MagicMock, ): helm_wrapper = Helm(helm_config=HelmConfig()) helm_wrapper.upgrade_install( @@ -168,7 +168,7 @@ def test_should_include_configured_tls_parameters_on_update( ) def test_should_call_run_command_method_when_helm_install_with_non_defaults( - self, run_command: MagicMock, mock_get_version: MagicMock + self, run_command: MagicMock, mock_get_version: MagicMock, ): helm_wrapper = Helm(helm_config=HelmConfig()) helm_wrapper.upgrade_install( @@ -213,7 +213,7 @@ def test_should_call_run_command_method_when_helm_install_with_non_defaults( ) def test_should_call_run_command_method_when_uninstalling_streams_app( - self, run_command: MagicMock, mock_get_version: MagicMock + self, run_command: MagicMock, mock_get_version: MagicMock, ): helm_wrapper = Helm(helm_config=HelmConfig()) helm_wrapper.uninstall( @@ -240,11 +240,11 @@ def test_should_log_warning_when_release_not_found( ) log_warning_mock.assert_called_once_with( - "Release with name test-release not found. Could not uninstall app." + "Release with name test-release not found. Could not uninstall app.", ) def test_should_call_run_command_method_when_installing_streams_app__with_dry_run( - self, run_command: MagicMock, mock_get_version: MagicMock + self, run_command: MagicMock, mock_get_version: MagicMock, ): helm_wrapper = Helm(helm_config=HelmConfig()) @@ -267,7 +267,7 @@ def test_should_call_run_command_method_when_installing_streams_app__with_dry_ru def test_validate_console_output(self): with pytest.raises(RuntimeError): Helm.parse_helm_command_stderr_output( - "A specific\n eRrOr was found in this line" + "A specific\n eRrOr was found in this line", ) with pytest.raises(ReleaseNotFoundException): Helm.parse_helm_command_stderr_output("New \nmessage\n ReLease: noT foUnD") @@ -275,13 +275,13 @@ def test_validate_console_output(self): Helm.parse_helm_command_stderr_output("This is \njust WaRnIng nothing more") except RuntimeError as e: pytest.fail( - f"validate_console_output() raised RuntimeError unexpectedly!\nError message: {e}" + f"validate_console_output() raised RuntimeError unexpectedly!\nError message: {e}", ) try: Helm.parse_helm_command_stderr_output("This is \njust WaRnIng nothing more") except ReleaseNotFoundException: pytest.fail( - f"validate_console_output() raised ReleaseNotFoundException unexpectedly!\nError message: {ReleaseNotFoundException}" + f"validate_console_output() raised ReleaseNotFoundException unexpectedly!\nError message: {ReleaseNotFoundException}", ) def test_helm_template_load(self): @@ -294,7 +294,7 @@ def test_helm_template_load(self): metadata: labels: foo: bar - """ + """, ) helm_template = HelmTemplate.load("test2.yaml", stdout) @@ -317,7 +317,7 @@ def test_load_manifest_with_no_notes(self): --- # Source: chart/templates/test3b.yaml foo: bar - """ + """, ) helm_templates = list(Helm.load_manifest(stdout)) assert len(helm_templates) == 2 @@ -334,7 +334,7 @@ def test_raise_parse_error_when_helm_content_is_invalid(self): """ --- # Resource: chart/templates/test1.yaml - """ + """, ) with pytest.raises(ParseError, match="Not a valid Helm template source"): helm_template = list(Helm.load_manifest(stdout)) @@ -386,7 +386,7 @@ def test_load_manifest(self): NOTES: test - """ + """, ) helm_templates = list(Helm.load_manifest(stdout)) assert len(helm_templates) == 2 @@ -399,7 +399,7 @@ def test_load_manifest(self): assert helm_templates[1].template == {"foo": "bar"} def test_helm_get_manifest( - self, run_command: MagicMock, mock_get_version: MagicMock + self, run_command: MagicMock, mock_get_version: MagicMock, ): helm_wrapper = Helm(helm_config=HelmConfig()) run_command.return_value = dedent( @@ -409,10 +409,10 @@ def test_helm_get_manifest( data: - a: 1 - b: 2 - """ + """, ) helm_templates = list( - helm_wrapper.get_manifest("test-release", "test-namespace") + helm_wrapper.get_manifest("test-release", "test-namespace"), ) run_command.assert_called_once_with( command=[ @@ -432,7 +432,7 @@ def test_helm_get_manifest( assert helm_wrapper.get_manifest("test-release", "test-namespace") == () def test_should_call_run_command_method_when_helm_template_with_optional_args( - self, run_command: MagicMock, mock_get_version: MagicMock + self, run_command: MagicMock, mock_get_version: MagicMock, ): helm_wrapper = Helm(helm_config=HelmConfig()) @@ -470,7 +470,7 @@ def test_should_call_run_command_method_when_helm_template_with_optional_args( ) def test_should_call_run_command_method_when_helm_template_without_optional_args( - self, run_command: MagicMock, mock_get_version: MagicMock + self, run_command: MagicMock, mock_get_version: MagicMock, ): helm_wrapper = Helm(helm_config=HelmConfig()) @@ -526,7 +526,7 @@ def test_should_call_helm_version( assert helm._version == expected_version def test_should_raise_exception_if_helm_version_is_old( - self, run_command: MagicMock + self, run_command: MagicMock, ): run_command.return_value = "v2.9.0+gc9f554d" with pytest.raises( @@ -536,10 +536,10 @@ def test_should_raise_exception_if_helm_version_is_old( Helm(helm_config=HelmConfig()) def test_should_raise_exception_if_helm_version_cannot_be_parsed( - self, run_command: MagicMock + self, run_command: MagicMock, ): run_command.return_value = "123" with pytest.raises( - RuntimeError, match="Could not parse the Helm version.\n\nHelm output:\n123" + RuntimeError, match="Could not parse the Helm version.\n\nHelm output:\n123", ): Helm(helm_config=HelmConfig()) diff --git a/tests/component_handlers/helm_wrapper/test_utils.py b/tests/component_handlers/helm_wrapper/test_utils.py index eef6ca14f..8f40b0c5d 100644 --- a/tests/component_handlers/helm_wrapper/test_utils.py +++ b/tests/component_handlers/helm_wrapper/test_utils.py @@ -12,7 +12,7 @@ def test_trim_release_name_with_suffix(): def test_trim_release_name_without_suffix(): name = trim_release_name( - "example-component-name-too-long-fake-fakefakefakefakefake" + "example-component-name-too-long-fake-fakefakefakefakefake", ) assert name == "example-component-name-too-long-fake-fakefakefakefak" assert len(name) == 52 diff --git a/tests/component_handlers/kafka_connect/test_connect_handler.py b/tests/component_handlers/kafka_connect/test_connect_handler.py index a5a1f3246..ff6b7068c 100644 --- a/tests/component_handlers/kafka_connect/test_connect_handler.py +++ b/tests/component_handlers/kafka_connect/test_connect_handler.py @@ -25,25 +25,25 @@ class TestConnectorHandler: @pytest.fixture def log_info_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.info" + "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.info", ) @pytest.fixture def log_warning_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.warning" + "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.warning", ) @pytest.fixture def log_error_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.error" + "kpops.component_handlers.kafka_connect.kafka_connect_handler.log.error", ) @pytest.fixture def renderer_diff_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.kafka_connect.kafka_connect_handler.render_diff" + "kpops.component_handlers.kafka_connect.kafka_connect_handler.render_diff", ) @staticmethod @@ -59,7 +59,7 @@ def connector_config(self) -> KafkaConnectorConfig: **{ "connector.class": "com.bakdata.connect.TestConnector", "name": CONNECTOR_NAME, - } + }, ) def test_should_create_connector_in_dry_run( @@ -75,15 +75,15 @@ def test_should_create_connector_in_dry_run( handler.create_connector(connector_config, dry_run=True) connector_wrapper.get_connector.assert_called_once_with(CONNECTOR_NAME) connector_wrapper.validate_connector_config.assert_called_once_with( - connector_config + connector_config, ) assert log_info_mock.mock_calls == [ mock.call.log_info( - f"Connector Creation: connector {CONNECTOR_NAME} already exists." + f"Connector Creation: connector {CONNECTOR_NAME} already exists.", ), mock.call.log_info( - f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!" + f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!", ), ] @@ -109,10 +109,10 @@ def test_should_log_correct_message_when_create_connector_and_connector_not_exis assert log_info_mock.mock_calls == [ mock.call( - f"Connector Creation: connector {CONNECTOR_NAME} does not exist. Creating connector with config:\n\x1b[32m+ connector.class: org.apache.kafka.connect.file.FileStreamSinkConnector\n\x1b[0m\x1b[32m+ name: {CONNECTOR_NAME}\n\x1b[0m\x1b[32m+ tasks.max: '1'\n\x1b[0m\x1b[32m+ topics: {TOPIC_NAME}\n\x1b[0m" + f"Connector Creation: connector {CONNECTOR_NAME} does not exist. Creating connector with config:\n\x1b[32m+ connector.class: org.apache.kafka.connect.file.FileStreamSinkConnector\n\x1b[0m\x1b[32m+ name: {CONNECTOR_NAME}\n\x1b[0m\x1b[32m+ tasks.max: '1'\n\x1b[0m\x1b[32m+ topics: {TOPIC_NAME}\n\x1b[0m", ), mock.call( - f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!" + f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!", ), ] @@ -134,7 +134,7 @@ def test_should_log_correct_message_when_create_connector_and_connector_exists_i "tasks": [], } connector_wrapper.get_connector.return_value = KafkaConnectResponse( - **actual_response + **actual_response, ) configs = { @@ -147,23 +147,23 @@ def test_should_log_correct_message_when_create_connector_and_connector_exists_i handler.create_connector(connector_config, dry_run=True) connector_wrapper.get_connector.assert_called_once_with(CONNECTOR_NAME) connector_wrapper.validate_connector_config.assert_called_once_with( - connector_config + connector_config, ) assert log_info_mock.mock_calls == [ mock.call( - f"Connector Creation: connector {CONNECTOR_NAME} already exists." + f"Connector Creation: connector {CONNECTOR_NAME} already exists.", ), mock.call( - f"Updating config:\n connector.class: org.apache.kafka.connect.file.FileStreamSinkConnector\n name: {CONNECTOR_NAME}\n\x1b[31m- tasks.max: '1'\n\x1b[0m\x1b[33m? ^\n\x1b[0m\x1b[32m+ tasks.max: '2'\n\x1b[0m\x1b[33m? ^\n\x1b[0m topics: {TOPIC_NAME}\n" + f"Updating config:\n connector.class: org.apache.kafka.connect.file.FileStreamSinkConnector\n name: {CONNECTOR_NAME}\n\x1b[31m- tasks.max: '1'\n\x1b[0m\x1b[33m? ^\n\x1b[0m\x1b[32m+ tasks.max: '2'\n\x1b[0m\x1b[33m? ^\n\x1b[0m topics: {TOPIC_NAME}\n", ), mock.call( - f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!" + f"Connector Creation: connector config for {CONNECTOR_NAME} is valid!", ), ] def test_should_log_invalid_config_when_create_connector_in_dry_run( - self, connector_config: KafkaConnectorConfig, renderer_diff_mock: MagicMock + self, connector_config: KafkaConnectorConfig, renderer_diff_mock: MagicMock, ): connector_wrapper = MagicMock() @@ -184,11 +184,11 @@ def test_should_log_invalid_config_when_create_connector_in_dry_run( handler.create_connector(connector_config, dry_run=True) connector_wrapper.validate_connector_config.assert_called_once_with( - connector_config + connector_config, ) def test_should_call_update_connector_config_when_connector_exists_not_dry_run( - self, connector_config: KafkaConnectorConfig + self, connector_config: KafkaConnectorConfig, ): connector_wrapper = MagicMock() handler = self.connector_handler(connector_wrapper) @@ -201,7 +201,7 @@ def test_should_call_update_connector_config_when_connector_exists_not_dry_run( ] def test_should_call_create_connector_when_connector_does_not_exists_not_dry_run( - self, connector_config: KafkaConnectorConfig + self, connector_config: KafkaConnectorConfig, ): connector_wrapper = MagicMock() @@ -224,8 +224,8 @@ def test_should_print_correct_log_when_destroying_connector_in_dry_run( log_info_mock.assert_called_once_with( magentaify( - f"Connector Destruction: connector {CONNECTOR_NAME} already exists. Deleting connector." - ) + f"Connector Destruction: connector {CONNECTOR_NAME} already exists. Deleting connector.", + ), ) def test_should_print_correct_warning_log_when_destroying_connector_and_connector_exists_in_dry_run( @@ -240,7 +240,7 @@ def test_should_print_correct_warning_log_when_destroying_connector_and_connecto handler.destroy_connector(CONNECTOR_NAME, dry_run=True) log_warning_mock.assert_called_once_with( - f"Connector Destruction: connector {CONNECTOR_NAME} does not exist and cannot be deleted. Skipping." + f"Connector Destruction: connector {CONNECTOR_NAME} does not exist and cannot be deleted. Skipping.", ) def test_should_call_delete_connector_when_destroying_existing_connector_not_dry_run( @@ -267,5 +267,5 @@ def test_should_print_correct_warning_log_when_destroying_connector_and_connecto handler.destroy_connector(CONNECTOR_NAME, dry_run=False) log_warning_mock.assert_called_once_with( - f"Connector Destruction: the connector {CONNECTOR_NAME} does not exist. Skipping." + f"Connector Destruction: the connector {CONNECTOR_NAME} does not exist. Skipping.", ) diff --git a/tests/component_handlers/kafka_connect/test_connect_wrapper.py b/tests/component_handlers/kafka_connect/test_connect_wrapper.py index 3db9c090f..d4dd13664 100644 --- a/tests/component_handlers/kafka_connect/test_connect_wrapper.py +++ b/tests/component_handlers/kafka_connect/test_connect_wrapper.py @@ -40,7 +40,7 @@ def connector_config(self) -> KafkaConnectorConfig: **{ "connector.class": "com.bakdata.connect.TestConnector", "name": "test-connector", - } + }, ) def test_should_through_exception_when_host_is_not_set(self): @@ -58,7 +58,7 @@ def test_should_through_exception_when_host_is_not_set(self): @patch("httpx.post") def test_should_create_post_requests_for_given_connector_configuration( - self, mock_post: MagicMock + self, mock_post: MagicMock, ): configs = { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", @@ -84,7 +84,7 @@ def test_should_create_post_requests_for_given_connector_configuration( ) def test_should_return_correct_response_when_connector_created( - self, httpx_mock: HTTPXMock, connector_config: KafkaConnectorConfig + self, httpx_mock: HTTPXMock, connector_config: KafkaConnectorConfig, ): actual_response = { "name": "hdfs-sink-connector", @@ -135,7 +135,7 @@ def test_should_raise_connector_exists_exception_when_connector_exists( ) log_warning.assert_called_with( - "Rebalancing in progress while creating a connector... Retrying..." + "Rebalancing in progress while creating a connector... Retrying...", ) @patch("httpx.get") @@ -152,7 +152,7 @@ def test_should_create_correct_get_connector_request(self, mock_get: MagicMock): @pytest.mark.flaky(reruns=5, condition=sys.platform.startswith("win32")) @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") def test_should_return_correct_response_when_getting_connector( - self, log_info: MagicMock, httpx_mock: HTTPXMock + self, log_info: MagicMock, httpx_mock: HTTPXMock, ): connector_name = "test-connector" @@ -187,7 +187,7 @@ def test_should_return_correct_response_when_getting_connector( @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") def test_should_raise_connector_not_found_when_getting_connector( - self, log_info: MagicMock, httpx_mock: HTTPXMock + self, log_info: MagicMock, httpx_mock: HTTPXMock, ): connector_name = "test-connector" @@ -202,12 +202,12 @@ def test_should_raise_connector_not_found_when_getting_connector( self.connect_wrapper.get_connector(connector_name) log_info.assert_called_once_with( - f"The named connector {connector_name} does not exists." + f"The named connector {connector_name} does not exists.", ) @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.warning") def test_should_raise_rebalance_in_progress_when_getting_connector( - self, log_warning: MagicMock, httpx_mock: HTTPXMock + self, log_warning: MagicMock, httpx_mock: HTTPXMock, ): connector_name = "test-connector" @@ -225,7 +225,7 @@ def test_should_raise_rebalance_in_progress_when_getting_connector( ) log_warning.assert_called_with( - "Rebalancing in progress while getting a connector... Retrying..." + "Rebalancing in progress while getting a connector... Retrying...", ) @patch("httpx.put") @@ -243,7 +243,7 @@ def test_should_create_correct_update_connector_request(self, mock_put: MagicMoc } with pytest.raises(KafkaConnectError): self.connect_wrapper.update_connector_config( - KafkaConnectorConfig(**configs) + KafkaConnectorConfig(**configs), ) mock_put.assert_called_with( @@ -287,11 +287,11 @@ def test_should_return_correct_response_when_update_connector( status_code=200, ) expected_response = self.connect_wrapper.update_connector_config( - connector_config + connector_config, ) assert KafkaConnectResponse(**actual_response) == expected_response log_info.assert_called_once_with( - f"Config for connector {connector_name} updated." + f"Config for connector {connector_name} updated.", ) @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") @@ -329,7 +329,7 @@ def test_should_return_correct_response_when_update_connector_created( status_code=201, ) expected_response = self.connect_wrapper.update_connector_config( - connector_config + connector_config, ) assert KafkaConnectResponse(**actual_response) == expected_response log_info.assert_called_once_with(f"Connector {connector_name} created.") @@ -357,12 +357,12 @@ def test_should_raise_connector_exists_exception_when_update_connector( ) log_warning.assert_called_with( - "Rebalancing in progress while updating a connector... Retrying..." + "Rebalancing in progress while updating a connector... Retrying...", ) @patch("httpx.delete") def test_should_create_correct_delete_connector_request( - self, mock_delete: MagicMock + self, mock_delete: MagicMock, ): connector_name = "test-connector" with pytest.raises(KafkaConnectError): @@ -375,7 +375,7 @@ def test_should_create_correct_delete_connector_request( @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") def test_should_return_correct_response_when_deleting_connector( - self, log_info: MagicMock, httpx_mock: HTTPXMock + self, log_info: MagicMock, httpx_mock: HTTPXMock, ): connector_name = "test-connector" @@ -410,7 +410,7 @@ def test_should_return_correct_response_when_deleting_connector( @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.info") def test_should_raise_connector_not_found_when_deleting_connector( - self, log_info: MagicMock, httpx_mock: HTTPXMock + self, log_info: MagicMock, httpx_mock: HTTPXMock, ): connector_name = "test-connector" @@ -425,12 +425,12 @@ def test_should_raise_connector_not_found_when_deleting_connector( self.connect_wrapper.delete_connector(connector_name) log_info.assert_called_once_with( - f"The named connector {connector_name} does not exists." + f"The named connector {connector_name} does not exists.", ) @patch("kpops.component_handlers.kafka_connect.connect_wrapper.log.warning") def test_should_raise_rebalance_in_progress_when_deleting_connector( - self, log_warning: MagicMock, httpx_mock: HTTPXMock + self, log_warning: MagicMock, httpx_mock: HTTPXMock, ): connector_name = "test-connector" @@ -448,12 +448,12 @@ def test_should_raise_rebalance_in_progress_when_deleting_connector( ) log_warning.assert_called_with( - "Rebalancing in progress while deleting a connector... Retrying..." + "Rebalancing in progress while deleting a connector... Retrying...", ) @patch("httpx.put") def test_should_create_correct_validate_connector_config_request( - self, mock_put: MagicMock + self, mock_put: MagicMock, ): connector_config = KafkaConnectorConfig( **{ @@ -461,7 +461,7 @@ def test_should_create_correct_validate_connector_config_request( "name": "FileStreamSinkConnector", "tasks.max": "1", "topics": "test-topic", - } + }, ) with pytest.raises(KafkaConnectError): self.connect_wrapper.validate_connector_config(connector_config) @@ -474,7 +474,7 @@ def test_should_create_correct_validate_connector_config_request( @patch("httpx.put") def test_should_create_correct_validate_connector_config_and_name_gets_added( - self, mock_put: MagicMock + self, mock_put: MagicMock, ): connector_name = "FileStreamSinkConnector" configs = { @@ -485,7 +485,7 @@ def test_should_create_correct_validate_connector_config_and_name_gets_added( } with pytest.raises(KafkaConnectError): self.connect_wrapper.validate_connector_config( - KafkaConnectorConfig(**configs) + KafkaConnectorConfig(**configs), ) mock_put.assert_called_with( @@ -514,9 +514,9 @@ def test_should_parse_validate_connector_config(self, httpx_mock: HTTPXMock): "topics": "test-topic", } errors = self.connect_wrapper.validate_connector_config( - KafkaConnectorConfig(**configs) + KafkaConnectorConfig(**configs), ) assert errors == [ - "Found error for field file: Missing required configuration 'file' which has no default value." + "Found error for field file: Missing required configuration 'file' which has no default value.", ] diff --git a/tests/component_handlers/schema_handler/resources/module.py b/tests/component_handlers/schema_handler/resources/module.py index 8c7168efa..4223179d3 100644 --- a/tests/component_handlers/schema_handler/resources/module.py +++ b/tests/component_handlers/schema_handler/resources/module.py @@ -9,6 +9,6 @@ class CustomSchemaProvider(SchemaProvider): def provide_schema( - self, schema_class: str, models: dict[ModelName, ModelVersion] + self, schema_class: str, models: dict[ModelName, ModelVersion], ) -> Schema: return AvroSchema({}) diff --git a/tests/component_handlers/schema_handler/test_schema_handler.py b/tests/component_handlers/schema_handler/test_schema_handler.py index ccea021c6..bd5815b12 100644 --- a/tests/component_handlers/schema_handler/test_schema_handler.py +++ b/tests/component_handlers/schema_handler/test_schema_handler.py @@ -28,28 +28,28 @@ @pytest.fixture(autouse=True) def log_info_mock(mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.log.info" + "kpops.component_handlers.schema_handler.schema_handler.log.info", ) @pytest.fixture(autouse=True) def log_debug_mock(mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.log.debug" + "kpops.component_handlers.schema_handler.schema_handler.log.debug", ) @pytest.fixture(autouse=False) def find_class_mock(mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.find_class" + "kpops.component_handlers.schema_handler.schema_handler.find_class", ) @pytest.fixture(autouse=True) def schema_registry_mock(mocker: MockerFixture) -> MagicMock: schema_registry_mock = mocker.patch( - "kpops.component_handlers.schema_handler.schema_handler.SchemaRegistryClient" + "kpops.component_handlers.schema_handler.schema_handler.SchemaRegistryClient", ) return schema_registry_mock.return_value @@ -96,16 +96,16 @@ def test_should_lazy_load_schema_provider(find_class_mock: MagicMock): schema_registry_url="http://localhost:8081", ) schema_handler = SchemaHandler.load_schema_handler( - TEST_SCHEMA_PROVIDER_MODULE, config_enable + TEST_SCHEMA_PROVIDER_MODULE, config_enable, ) assert schema_handler is not None schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", {} + "com.bakdata.kpops.test.SchemaHandlerTest", {}, ) schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SomeOtherSchemaClass", {} + "com.bakdata.kpops.test.SomeOtherSchemaClass", {}, ) find_class_mock.assert_called_once_with(TEST_SCHEMA_PROVIDER_MODULE, SchemaProvider) @@ -113,12 +113,12 @@ def test_should_lazy_load_schema_provider(find_class_mock: MagicMock): def test_should_raise_value_error_if_schema_provider_class_not_found(): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=NON_EXISTING_PROVIDER_MODULE + url="http://mock:8081", components_module=NON_EXISTING_PROVIDER_MODULE, ) with pytest.raises(ValueError) as value_error: schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", {} + "com.bakdata.kpops.test.SchemaHandlerTest", {}, ) assert ( @@ -140,14 +140,14 @@ def test_should_raise_value_error_when_schema_provider_is_called_and_components_ schema_handler = SchemaHandler.load_schema_handler(None, config_enable) assert schema_handler is not None schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", {} + "com.bakdata.kpops.test.SchemaHandlerTest", {}, ) with pytest.raises(ValueError) as value_error: schema_handler = SchemaHandler.load_schema_handler("", config_enable) assert schema_handler is not None schema_handler.schema_provider.provide_schema( - "com.bakdata.kpops.test.SchemaHandlerTest", {} + "com.bakdata.kpops.test.SchemaHandlerTest", {}, ) assert ( @@ -157,10 +157,10 @@ def test_should_raise_value_error_when_schema_provider_is_called_and_components_ def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( - to_section: ToSection, log_info_mock: MagicMock, schema_registry_mock: MagicMock + to_section: ToSection, log_info_mock: MagicMock, schema_registry_mock: MagicMock, ): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [] @@ -168,7 +168,7 @@ def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( schema_handler.submit_schemas(to_section, True) log_info_mock.assert_called_once_with( - greenify("Schema Submission: The subject topic-X-value will be submitted.") + greenify("Schema Submission: The subject topic-X-value will be submitted."), ) schema_registry_mock.register.assert_not_called() @@ -180,7 +180,7 @@ def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( schema_registry_mock: MagicMock, ): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [1, 2, 3] @@ -190,7 +190,7 @@ def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( schema_handler.submit_schemas(to_section, True) log_info_mock.assert_called_once_with( - f"Schema Submission: compatible schema for topic-X-value with model {topic_config.value_schema}." + f"Schema Submission: compatible schema for topic-X-value with model {topic_config.value_schema}.", ) schema_registry_mock.register.assert_not_called() @@ -202,7 +202,7 @@ def test_should_raise_exception_when_submit_schema_that_exists_and_not_compatibl ): schema_provider = TestSchemaProvider() schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" @@ -242,7 +242,7 @@ def test_should_log_debug_when_submit_schema_that_exists_and_registered_under_ve ): schema_provider = TestSchemaProvider() schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" schema = schema_provider.provide_schema(schema_class, {}) @@ -255,13 +255,13 @@ def test_should_log_debug_when_submit_schema_that_exists_and_registered_under_ve assert log_info_mock.mock_calls == [ mock.call( - f"Schema Submission: compatible schema for topic-X-value with model {topic_config.value_schema}." + f"Schema Submission: compatible schema for topic-X-value with model {topic_config.value_schema}.", ), ] assert log_debug_mock.mock_calls == [ mock.call( - f"Schema Submission: schema was already submitted for the subject topic-X-value as version {registered_version.schema}. Therefore, the specified schema must be compatible." + f"Schema Submission: schema was already submitted for the subject topic-X-value as version {registered_version.schema}. Therefore, the specified schema must be compatible.", ), ] @@ -278,7 +278,7 @@ def test_should_submit_non_existing_schema_when_not_dry( schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" schema = schema_provider.provide_schema(schema_class, {}) schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [] @@ -287,12 +287,12 @@ def test_should_submit_non_existing_schema_when_not_dry( subject = "topic-X-value" log_info_mock.assert_called_once_with( - f"Schema Submission: schema submitted for {subject} with model {topic_config.value_schema}." + f"Schema Submission: schema submitted for {subject} with model {topic_config.value_schema}.", ) schema_registry_mock.get_versions.assert_not_called() schema_registry_mock.register.assert_called_once_with( - subject=subject, schema=schema + subject=subject, schema=schema, ) @@ -302,7 +302,7 @@ def test_should_log_correct_message_when_delete_schemas_and_in_dry_run( schema_registry_mock: MagicMock, ): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [] @@ -310,17 +310,17 @@ def test_should_log_correct_message_when_delete_schemas_and_in_dry_run( schema_handler.delete_schemas(to_section, True) log_info_mock.assert_called_once_with( - magentaify("Schema Deletion: will delete subject topic-X-value.") + magentaify("Schema Deletion: will delete subject topic-X-value."), ) schema_registry_mock.delete_subject.assert_not_called() def test_should_delete_schemas_when_not_in_dry_run( - to_section: ToSection, schema_registry_mock: MagicMock + to_section: ToSection, schema_registry_mock: MagicMock, ): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [] diff --git a/tests/component_handlers/topic/test_proxy_wrapper.py b/tests/component_handlers/topic/test_proxy_wrapper.py index 7b587ecb3..c8fb3e94e 100644 --- a/tests/component_handlers/topic/test_proxy_wrapper.py +++ b/tests/component_handlers/topic/test_proxy_wrapper.py @@ -32,12 +32,12 @@ def log_debug_mock(self, mocker: MockerFixture) -> MagicMock: @pytest.fixture(autouse=True) def setup(self, httpx_mock: HTTPXMock): config = PipelineConfig( - defaults_path=DEFAULTS_PATH, environment="development", kafka_rest_host=HOST + defaults_path=DEFAULTS_PATH, environment="development", kafka_rest_host=HOST, ) self.proxy_wrapper = ProxyWrapper(pipeline_config=config) with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses" / "cluster-info.json" + DEFAULTS_PATH / "kafka_rest_proxy_responses" / "cluster-info.json", ) as f: cluster_response = json.load(f) @@ -62,7 +62,7 @@ def test_should_raise_exception_when_host_is_not_set(self): @patch("httpx.post") def test_should_create_topic_with_all_topic_configuration( - self, mock_post: MagicMock + self, mock_post: MagicMock, ): topic_spec = { "topic_name": "topic-X", @@ -128,7 +128,7 @@ def test_should_call_batch_alter_topic_config(self, mock_put: MagicMock): "data": [ {"name": "cleanup.policy", "operation": "DELETE"}, {"name": "compression.type", "value": "gzip"}, - ] + ], }, ) @@ -155,7 +155,7 @@ def test_should_call_get_broker_config(self, mock_get: MagicMock): ) def test_should_log_topic_creation( - self, log_info_mock: MagicMock, httpx_mock: HTTPXMock + self, log_info_mock: MagicMock, httpx_mock: HTTPXMock, ): topic_spec = { "topic_name": "topic-X", @@ -178,7 +178,7 @@ def test_should_log_topic_creation( log_info_mock.assert_called_once_with("Topic topic-X created.") def test_should_log_topic_deletion( - self, log_info_mock: MagicMock, httpx_mock: HTTPXMock + self, log_info_mock: MagicMock, httpx_mock: HTTPXMock, ): topic_name = "topic-X" @@ -225,7 +225,7 @@ def test_should_get_topic(self, log_debug_mock: MagicMock, httpx_mock: HTTPXMock assert get_topic_response == topic_response def test_should_rais_topic_not_found_exception_get_topic( - self, log_debug_mock: MagicMock, httpx_mock: HTTPXMock + self, log_debug_mock: MagicMock, httpx_mock: HTTPXMock, ): topic_name = "topic-X" @@ -244,7 +244,7 @@ def test_should_rais_topic_not_found_exception_get_topic( log_debug_mock.assert_any_call("Topic topic-X not found.") def test_should_log_reset_default_topic_config_when_deleted( - self, log_info_mock: MagicMock, httpx_mock: HTTPXMock + self, log_info_mock: MagicMock, httpx_mock: HTTPXMock, ): topic_name = "topic-X" config_name = "cleanup.policy" @@ -263,5 +263,5 @@ def test_should_log_reset_default_topic_config_when_deleted( ) log_info_mock.assert_called_once_with( - f"Config of topic {topic_name} was altered." + f"Config of topic {topic_name} was altered.", ) diff --git a/tests/component_handlers/topic/test_topic_handler.py b/tests/component_handlers/topic/test_topic_handler.py index c53a7a60d..a64a239a9 100644 --- a/tests/component_handlers/topic/test_topic_handler.py +++ b/tests/component_handlers/topic/test_topic_handler.py @@ -52,17 +52,17 @@ def log_error_mock(self, mocker: MockerFixture) -> MagicMock: @pytest.fixture(autouse=True) def get_topic_response_mock(self) -> MagicMock: with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/get_topic_response.json" + DEFAULTS_PATH / "kafka_rest_proxy_responses/get_topic_response.json", ) as f: response = json.load(f) with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/broker_response.json" + DEFAULTS_PATH / "kafka_rest_proxy_responses/broker_response.json", ) as f: broker_response = json.load(f) with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/topic_config_response.json" + DEFAULTS_PATH / "kafka_rest_proxy_responses/topic_config_response.json", ) as f: response_topic_config = json.load(f) @@ -70,19 +70,19 @@ def get_topic_response_mock(self) -> MagicMock: wrapper.get_topic.return_value = TopicResponse(**response) wrapper.get_broker_config.return_value = BrokerConfigResponse(**broker_response) wrapper.get_topic_config.return_value = TopicConfigResponse( - **response_topic_config + **response_topic_config, ) return wrapper @pytest.fixture(autouse=True) def get_default_topic_response_mock(self) -> MagicMock: with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/get_default_topic_response.json" + DEFAULTS_PATH / "kafka_rest_proxy_responses/get_default_topic_response.json", ) as f: response = json.load(f) with open( - DEFAULTS_PATH / "kafka_rest_proxy_responses/broker_response.json" + DEFAULTS_PATH / "kafka_rest_proxy_responses/broker_response.json", ) as f: broker_response = json.load(f) @@ -120,7 +120,7 @@ def test_should_call_create_topic_with_dry_run_false(self): wrapper.__dry_run_topic_creation.assert_not_called() def test_should_call_update_topic_config_when_topic_exists_and_with_dry_run_false( - self, get_topic_response_mock: MagicMock + self, get_topic_response_mock: MagicMock, ): wrapper = get_topic_response_mock topic_handler = TopicHandler(proxy_wrapper=wrapper) @@ -146,7 +146,7 @@ def test_should_call_update_topic_config_when_topic_exists_and_with_dry_run_fals wrapper.__dry_run_topic_creation.assert_not_called() def test_should_update_topic_config_when_one_config_changed( - self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock + self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock, ): wrapper = get_topic_response_mock @@ -168,7 +168,7 @@ def test_should_update_topic_config_when_one_config_changed( ) def test_should_not_update_topic_config_when_config_not_changed( - self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock + self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock, ): wrapper = get_topic_response_mock @@ -186,11 +186,11 @@ def test_should_not_update_topic_config_when_config_not_changed( wrapper.batch_alter_topic_config.assert_not_called() log_info_mock.assert_called_once_with( - "Topic Creation: config of topic topic-X didn't change. Skipping update." + "Topic Creation: config of topic topic-X didn't change. Skipping update.", ) def test_should_not_update_topic_config_when_config_not_changed_and_not_ordered( - self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock + self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock, ): wrapper = get_topic_response_mock topic_handler = TopicHandler(proxy_wrapper=wrapper) @@ -207,11 +207,11 @@ def test_should_not_update_topic_config_when_config_not_changed_and_not_ordered( wrapper.batch_alter_topic_config.assert_not_called() log_info_mock.assert_called_once_with( - "Topic Creation: config of topic topic-X didn't change. Skipping update." + "Topic Creation: config of topic topic-X didn't change. Skipping update.", ) def test_should_call_reset_topic_config_when_topic_exists_dry_run_false_and_topic_configs_change( - self, get_topic_response_mock: MagicMock + self, get_topic_response_mock: MagicMock, ): wrapper = get_topic_response_mock @@ -251,7 +251,7 @@ def test_should_not_call_create_topics_with_dry_run_true_and_topic_not_exists(se wrapper.create_topic.assert_not_called() def test_should_print_message_with_dry_run_true_and_topic_not_exists( - self, log_info_mock: MagicMock + self, log_info_mock: MagicMock, ): wrapper = MagicMock() wrapper.get_topic.side_effect = TopicNotFoundException() @@ -271,8 +271,8 @@ def test_should_print_message_with_dry_run_true_and_topic_not_exists( log_info_mock.assert_called_once_with( greenify( - "Topic Creation: topic-X does not exist in the cluster. Creating topic." - ) + "Topic Creation: topic-X does not exist in the cluster. Creating topic.", + ), ) def test_should_print_message_if_dry_run_and_topic_exists_with_same_partition_count_and_replication_factor( @@ -295,19 +295,19 @@ def test_should_print_message_if_dry_run_and_topic_exists_with_same_partition_co topic_handler.create_topics(to_section=to_section, dry_run=True) wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff assert log_info_mock.mock_calls == [ - mock.call("Topic Creation: topic-X already exists in cluster.") + mock.call("Topic Creation: topic-X already exists in cluster."), ] assert log_debug_mock.mock_calls == [ mock.call("HTTP/1.1 400 Bad Request"), mock.call({"Content-Type": "application/json"}), mock.call( - {"error_code": 40002, "message": "Topic 'topic-X' already exists."} + {"error_code": 40002, "message": "Topic 'topic-X' already exists."}, ), mock.call( - "Topic Creation: partition count of topic topic-X did not change. Current partitions count 10. Updating configs." + "Topic Creation: partition count of topic topic-X did not change. Current partitions count 10. Updating configs.", ), mock.call( - "Topic Creation: replication factor of topic topic-X did not change. Current replication factor 3. Updating configs." + "Topic Creation: replication factor of topic topic-X did not change. Current replication factor 3. Updating configs.", ), ] @@ -331,7 +331,7 @@ def test_should_print_message_if_dry_run_and_topic_exists_with_default_partition assert log_info_mock.mock_calls == [ mock.call("Config changes for topic topic-X:"), mock.call( - "\n\x1b[32m+ cleanup.policy: compact\n\x1b[0m\x1b[32m+ compression.type: gzip\n\x1b[0m" + "\n\x1b[32m+ cleanup.policy: compact\n\x1b[0m\x1b[32m+ compression.type: gzip\n\x1b[0m", ), mock.call("Topic Creation: topic-X already exists in cluster."), ] @@ -339,18 +339,18 @@ def test_should_print_message_if_dry_run_and_topic_exists_with_default_partition mock.call("HTTP/1.1 400 Bad Request"), mock.call({"Content-Type": "application/json"}), mock.call( - {"error_code": 40002, "message": "Topic 'topic-X' already exists."} + {"error_code": 40002, "message": "Topic 'topic-X' already exists."}, ), mock.call( - "Topic Creation: partition count of topic topic-X did not change. Current partitions count 1. Updating configs." + "Topic Creation: partition count of topic topic-X did not change. Current partitions count 1. Updating configs.", ), mock.call( - "Topic Creation: replication factor of topic topic-X did not change. Current replication factor 1. Updating configs." + "Topic Creation: replication factor of topic topic-X did not change. Current replication factor 1. Updating configs.", ), ] def test_should_exit_if_dry_run_and_topic_exists_different_partition_count( - self, get_topic_response_mock: MagicMock + self, get_topic_response_mock: MagicMock, ): wrapper = get_topic_response_mock @@ -372,7 +372,7 @@ def test_should_exit_if_dry_run_and_topic_exists_different_partition_count( wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff def test_should_exit_if_dry_run_and_topic_exists_different_replication_factor( - self, get_topic_response_mock: MagicMock + self, get_topic_response_mock: MagicMock, ): wrapper = get_topic_response_mock @@ -394,7 +394,7 @@ def test_should_exit_if_dry_run_and_topic_exists_different_replication_factor( wrapper.get_topic_config.assert_called_once() # dry run requests the config to create the diff def test_should_log_correct_message_when_delete_existing_topic_dry_run( - self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock + self, log_info_mock: MagicMock, get_topic_response_mock: MagicMock, ): wrapper = get_topic_response_mock @@ -413,12 +413,12 @@ def test_should_log_correct_message_when_delete_existing_topic_dry_run( wrapper.get_topic.assert_called_once_with(topic_name="topic-X") log_info_mock.assert_called_once_with( magentaify( - "Topic Deletion: topic topic-X exists in the cluster. Deleting topic." - ) + "Topic Deletion: topic topic-X exists in the cluster. Deleting topic.", + ), ) def test_should_log_correct_message_when_delete_non_existing_topic_dry_run( - self, log_warning_mock: MagicMock + self, log_warning_mock: MagicMock, ): wrapper = MagicMock() wrapper.get_topic.side_effect = TopicNotFoundException @@ -437,7 +437,7 @@ def test_should_log_correct_message_when_delete_non_existing_topic_dry_run( wrapper.get_topic.assert_called_once_with(topic_name="topic-X") log_warning_mock.assert_called_once_with( - "Topic Deletion: topic topic-X does not exist in the cluster and cannot be deleted. Skipping." + "Topic Deletion: topic topic-X does not exist in the cluster and cannot be deleted. Skipping.", ) def test_should_call_delete_topic_not_dry_run(self): @@ -460,7 +460,7 @@ def test_should_call_delete_topic_not_dry_run(self): ] def test_should_print_correct_warning_when_deleting_topic_that_does_not_exists_not_dry_run( - self, log_warning_mock: MagicMock + self, log_warning_mock: MagicMock, ): wrapper = MagicMock() topic_handler = TopicHandler(proxy_wrapper=wrapper) @@ -478,5 +478,5 @@ def test_should_print_correct_warning_when_deleting_topic_that_does_not_exists_n wrapper.get_topic.assert_called_once_with(topic_name="topic-X") log_warning_mock.assert_called_once_with( - "Topic Deletion: topic topic-X does not exist in the cluster and cannot be deleted. Skipping." + "Topic Deletion: topic topic-X does not exist in the cluster and cannot be deleted. Skipping.", ) diff --git a/tests/component_handlers/topic/test_utils.py b/tests/component_handlers/topic/test_utils.py index b5f0133ca..0d3bd1170 100644 --- a/tests/component_handlers/topic/test_utils.py +++ b/tests/component_handlers/topic/test_utils.py @@ -86,7 +86,7 @@ "name": "log.flush.interval.messages", "source": "DEFAULT_CONFIG", "value": "9223372036854775807", - } + }, ], "topic_name": "fake", "value": "9223372036854775807", @@ -108,7 +108,7 @@ "name": "flush.ms", "source": "DEFAULT_CONFIG", "value": "9223372036854775807", - } + }, ], "topic_name": "fake", "value": "9223372036854775807", @@ -247,7 +247,7 @@ def test_get_effective_config(): ], }, ], - } + }, ) effective_config = get_effective_config( diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index 7b25e5f74..6e3e3d570 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -77,7 +77,7 @@ class TestBaseDefaultsComponent: ], ) def test_load_defaults( - self, component_class: type[BaseDefaultsComponent], defaults: dict + self, component_class: type[BaseDefaultsComponent], defaults: dict, ): assert ( load_defaults(component_class, DEFAULTS_PATH / "defaults.yaml") == defaults @@ -105,7 +105,7 @@ def test_load_defaults( ], ) def test_load_defaults_with_environment( - self, component_class: type[BaseDefaultsComponent], defaults: dict + self, component_class: type[BaseDefaultsComponent], defaults: dict, ): assert ( load_defaults( @@ -117,7 +117,7 @@ def test_load_defaults_with_environment( ) def test_inherit_defaults( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: PipelineConfig, handlers: ComponentHandlers, ): component = Child(config=config, handlers=handlers) @@ -125,7 +125,7 @@ def test_inherit_defaults( component.name == "fake-child-name" ), "Child default should overwrite parent default" assert component.nice == { - "fake-value": "fake" + "fake-value": "fake", }, "Field introduce by child should be added" assert ( component.value == 2.0 @@ -148,7 +148,7 @@ def test_inherit(self, config: PipelineConfig, handlers: ComponentHandlers): component.name == "name-defined-in-pipeline_generator" ), "Kwargs should should overwrite all other values" assert component.nice == { - "fake-value": "fake" + "fake-value": "fake", }, "Field introduce by child should be added" assert ( component.value == 2.0 @@ -161,7 +161,7 @@ def test_inherit(self, config: PipelineConfig, handlers: ComponentHandlers): ), "Defaults in code should be kept for parents" def test_multiple_generations( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: PipelineConfig, handlers: ComponentHandlers, ): component = GrandChild(config=config, handlers=handlers) @@ -169,7 +169,7 @@ def test_multiple_generations( component.name == "fake-child-name" ), "Child default should overwrite parent default" assert component.nice == { - "fake-value": "fake" + "fake-value": "fake", }, "Field introduce by child should be added" assert ( component.value == 2.0 @@ -183,11 +183,11 @@ def test_multiple_generations( assert component.grand_child == "grand-child-value" def test_env_var_substitution( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: PipelineConfig, handlers: ComponentHandlers, ): ENV["pipeline_name"] = str(DEFAULTS_PATH) component = EnvVarTest(config=config, handlers=handlers) assert component.name == str( - DEFAULTS_PATH + DEFAULTS_PATH, ), "Environment variables should be substituted" diff --git a/tests/components/test_kafka_app.py b/tests/components/test_kafka_app.py index c6527c00c..aae796153 100644 --- a/tests/components/test_kafka_app.py +++ b/tests/components/test_kafka_app.py @@ -80,7 +80,7 @@ def test_should_deploy_kafka_app( ) helm_upgrade_install = mocker.patch.object(kafka_app.helm, "upgrade_install") print_helm_diff = mocker.patch.object( - kafka_app.dry_run_handler, "print_helm_diff" + kafka_app.dry_run_handler, "print_helm_diff", ) mocker.patch.object( KafkaApp, diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index 912f449fb..e22d26d52 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -42,13 +42,13 @@ def handlers(self) -> ComponentHandlers: @pytest.fixture(autouse=True) def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.components.base_components.kafka_connector.Helm" + "kpops.components.base_components.kafka_connector.Helm", ).return_value @pytest.fixture def dry_run_handler(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.components.base_components.kafka_connector.DryRunHandler" + "kpops.components.base_components.kafka_connector.DryRunHandler", ).return_value @pytest.fixture @@ -57,7 +57,7 @@ def connector_config(self) -> KafkaConnectorConfig: **{ "connector.class": CONNECTOR_CLASS, "name": CONNECTOR_FULL_NAME, - } + }, ) def test_connector_config_name_override( @@ -85,7 +85,7 @@ def test_connector_config_name_override( assert connector.app.name == CONNECTOR_FULL_NAME with pytest.raises( - ValueError, match="Connector name should be the same as component name" + ValueError, match="Connector name should be the same as component name", ): KafkaConnector( name=CONNECTOR_NAME, @@ -96,7 +96,7 @@ def test_connector_config_name_override( ) with pytest.raises( - ValueError, match="Connector name should be the same as component name" + ValueError, match="Connector name should be the same as component name", ): KafkaConnector( name=CONNECTOR_NAME, diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 91760e90c..a0650c633 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -55,9 +55,9 @@ def connector( to=ToSection( topics={ TopicName("${output_topic_name}"): TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), - } + }, ), ) @@ -73,7 +73,7 @@ def test_connector_config_parsing( config=config, handlers=handlers, app=KafkaConnectorConfig( - **{**connector_config.dict(), "topics": topic_name} + **{**connector_config.dict(), "topics": topic_name}, ), namespace="test-namespace", ) @@ -85,7 +85,7 @@ def test_connector_config_parsing( config=config, handlers=handlers, app=KafkaConnectorConfig( - **{**connector_config.dict(), "topics.regex": topic_pattern} + **{**connector_config.dict(), "topics.regex": topic_pattern}, ), namespace="test-namespace", ) @@ -109,7 +109,7 @@ def test_from_section_parsing_input_topic( topics={ topic1: FromTopic(type=InputTopicTypes.INPUT), topic2: FromTopic(type=InputTopicTypes.INPUT), - } + }, ), ) assert getattr(connector.app, "topics") == f"{topic1},{topic2}" @@ -132,7 +132,7 @@ def test_from_section_parsing_input_pattern( app=connector_config, namespace="test-namespace", from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type - topics={topic_pattern: FromTopic(type=InputTopicTypes.PATTERN)} + topics={topic_pattern: FromTopic(type=InputTopicTypes.PATTERN)}, ), ) assert getattr(connector.app, "topics.regex") == topic_pattern @@ -143,10 +143,10 @@ def test_deploy_order( mocker: MockerFixture, ): mock_create_topics = mocker.patch.object( - connector.handlers.topic_handler, "create_topics" + connector.handlers.topic_handler, "create_topics", ) mock_create_connector = mocker.patch.object( - connector.handlers.connector_handler, "create_connector" + connector.handlers.connector_handler, "create_connector", ) mock = mocker.MagicMock() @@ -164,13 +164,13 @@ def test_destroy( mocker: MockerFixture, ): mock_destroy_connector = mocker.patch.object( - connector.handlers.connector_handler, "destroy_connector" + connector.handlers.connector_handler, "destroy_connector", ) connector.destroy(dry_run=True) mock_destroy_connector.assert_called_once_with( - CONNECTOR_FULL_NAME, dry_run=True + CONNECTOR_FULL_NAME, dry_run=True, ) def test_reset_when_dry_run_is_true( @@ -191,10 +191,10 @@ def test_reset_when_dry_run_is_false( mocker: MockerFixture, ): mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, "delete_topics" + connector.handlers.topic_handler, "delete_topics", ) mock_clean_connector = mocker.patch.object( - connector.handlers.connector_handler, "clean_connector" + connector.handlers.connector_handler, "clean_connector", ) mock = mocker.MagicMock() mock.attach_mock(mock_clean_connector, "mock_clean_connector") @@ -264,10 +264,10 @@ def test_clean_when_dry_run_is_false( mocker: MockerFixture, ): mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, "delete_topics" + connector.handlers.topic_handler, "delete_topics", ) mock_clean_connector = mocker.patch.object( - connector.handlers.connector_handler, "clean_connector" + connector.handlers.connector_handler, "clean_connector", ) mock = mocker.MagicMock() @@ -281,13 +281,13 @@ def test_clean_when_dry_run_is_false( assert log_info_mock.mock_calls == [ call.log_info( magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_FULL_NAME}" - ) + f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_FULL_NAME}", + ), ), call.log_info( magentaify( - f"Connector Cleanup: deploy Connect {KafkaConnectorType.SINK.value} resetter for {CONNECTOR_FULL_NAME}" - ) + f"Connector Cleanup: deploy Connect {KafkaConnectorType.SINK.value} resetter for {CONNECTOR_FULL_NAME}", + ), ), call.log_info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")), ] @@ -369,10 +369,10 @@ def test_clean_without_to_when_dry_run_is_false( ) mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, "delete_topics" + connector.handlers.topic_handler, "delete_topics", ) mock_clean_connector = mocker.patch.object( - connector.handlers.connector_handler, "clean_connector" + connector.handlers.connector_handler, "clean_connector", ) mock = mocker.MagicMock() mock.attach_mock(mock_delete_topics, "mock_delete_topics") diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index db9a2dd77..18548df34 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -48,9 +48,9 @@ def connector( to=ToSection( topics={ TopicName("${output_topic_name}"): TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), - } + }, ), offset_topic="kafka-connect-offsets", ) @@ -71,9 +71,9 @@ def test_from_section_raises_exception( from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type topics={ TopicName("connector-topic"): FromTopic( - type=InputTopicTypes.INPUT + type=InputTopicTypes.INPUT, ), - } + }, ), ) @@ -83,11 +83,11 @@ def test_deploy_order( mocker: MockerFixture, ): mock_create_topics = mocker.patch.object( - connector.handlers.topic_handler, "create_topics" + connector.handlers.topic_handler, "create_topics", ) mock_create_connector = mocker.patch.object( - connector.handlers.connector_handler, "create_connector" + connector.handlers.connector_handler, "create_connector", ) mock = mocker.MagicMock() @@ -108,13 +108,13 @@ def test_destroy( assert connector.handlers.connector_handler mock_destroy_connector = mocker.patch.object( - connector.handlers.connector_handler, "destroy_connector" + connector.handlers.connector_handler, "destroy_connector", ) connector.destroy(dry_run=True) mock_destroy_connector.assert_called_once_with( - CONNECTOR_FULL_NAME, dry_run=True + CONNECTOR_FULL_NAME, dry_run=True, ) def test_reset_when_dry_run_is_true( @@ -137,10 +137,10 @@ def test_reset_when_dry_run_is_false( ): assert connector.handlers.connector_handler mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, "delete_topics" + connector.handlers.topic_handler, "delete_topics", ) mock_clean_connector = mocker.spy( - connector.handlers.connector_handler, "clean_connector" + connector.handlers.connector_handler, "clean_connector", ) mock = mocker.MagicMock() @@ -210,10 +210,10 @@ def test_clean_when_dry_run_is_false( assert connector.handlers.connector_handler mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, "delete_topics" + connector.handlers.topic_handler, "delete_topics", ) mock_clean_connector = mocker.spy( - connector.handlers.connector_handler, "clean_connector" + connector.handlers.connector_handler, "clean_connector", ) mock = mocker.MagicMock() @@ -286,10 +286,10 @@ def test_clean_without_to_when_dry_run_is_false( assert connector.handlers.connector_handler mock_delete_topics = mocker.patch.object( - connector.handlers.topic_handler, "delete_topics" + connector.handlers.topic_handler, "delete_topics", ) mock_clean_connector = mocker.spy( - connector.handlers.connector_handler, "clean_connector" + connector.handlers.connector_handler, "clean_connector", ) mock = mocker.MagicMock() diff --git a/tests/components/test_kubernetes_app.py b/tests/components/test_kubernetes_app.py index 46eb9795d..cc2b4d275 100644 --- a/tests/components/test_kubernetes_app.py +++ b/tests/components/test_kubernetes_app.py @@ -46,7 +46,7 @@ def handlers(self) -> ComponentHandlers: @pytest.fixture def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.components.base_components.kubernetes_app.Helm" + "kpops.components.base_components.kubernetes_app.Helm", ).return_value @pytest.fixture @@ -113,7 +113,7 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( app_value: KubernetesTestValue, ): repo_config = HelmRepoConfig( - repository_name="test-repo", url="https://test.com/charts/" + repository_name="test-repo", url="https://test.com/charts/", ) kubernetes_app = KubernetesApp( name="test-kubernetes-app", @@ -211,7 +211,7 @@ def test_should_call_helm_uninstall_when_destroying_kubernetes_app( kubernetes_app.destroy(True) helm_mock.uninstall.assert_called_once_with( - "test-namespace", "${pipeline_name}-test-kubernetes-app", True + "test-namespace", "${pipeline_name}-test-kubernetes-app", True, ) log_info_mock.assert_called_once_with(magentaify(stdout)) @@ -224,7 +224,7 @@ def test_should_raise_value_error_when_name_is_not_valid( repo_config: HelmRepoConfig, ): with pytest.raises( - ValueError, match=r"The component name .* is invalid for Kubernetes." + ValueError, match=r"The component name .* is invalid for Kubernetes.", ): KubernetesApp( name="Not-Compatible*", @@ -236,7 +236,7 @@ def test_should_raise_value_error_when_name_is_not_valid( ) with pytest.raises( - ValueError, match=r"The component name .* is invalid for Kubernetes." + ValueError, match=r"The component name .* is invalid for Kubernetes.", ): KubernetesApp( name="snake_case*", diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 56d52a68b..216202b60 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -42,7 +42,7 @@ def config(self) -> PipelineConfig: @pytest.fixture def producer_app( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: PipelineConfig, handlers: ComponentHandlers, ) -> ProducerApp: return ProducerApp( name=self.PRODUCER_APP_NAME, @@ -58,9 +58,9 @@ def producer_app( "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), - } + }, }, }, ) @@ -79,20 +79,20 @@ def test_output_topics(self, config: PipelineConfig, handlers: ComponentHandlers "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), "extra-topic-1": TopicConfig( role="first-extra-topic", partitions_count=10, ), - } + }, }, }, ) assert producer_app.app.streams.output_topic == "${output_topic_name}" assert producer_app.app.streams.extra_output_topics == { - "first-extra-topic": "extra-topic-1" + "first-extra-topic": "extra-topic-1", } def test_deploy_order_when_dry_run_is_false( @@ -101,11 +101,11 @@ def test_deploy_order_when_dry_run_is_false( mocker: MockerFixture, ): mock_create_topics = mocker.patch.object( - producer_app.handlers.topic_handler, "create_topics" + producer_app.handlers.topic_handler, "create_topics", ) mock_helm_upgrade_install = mocker.patch.object( - producer_app.helm, "upgrade_install" + producer_app.helm, "upgrade_install", ) mock = mocker.MagicMock() @@ -150,7 +150,7 @@ def test_destroy( producer_app.destroy(dry_run=True) mock_helm_uninstall.assert_called_once_with( - "test-namespace", "${pipeline_name}-" + self.PRODUCER_APP_NAME, True + "test-namespace", "${pipeline_name}-" + self.PRODUCER_APP_NAME, True, ) def test_should_not_reset_producer_app( @@ -159,11 +159,11 @@ def test_should_not_reset_producer_app( mocker: MockerFixture, ): mock_helm_upgrade_install = mocker.patch.object( - producer_app.helm, "upgrade_install" + producer_app.helm, "upgrade_install", ) mock_helm_uninstall = mocker.patch.object(producer_app.helm, "uninstall") mock_helm_print_helm_diff = mocker.patch.object( - producer_app.dry_run_handler, "print_helm_diff" + producer_app.dry_run_handler, "print_helm_diff", ) mock = mocker.MagicMock() @@ -205,10 +205,10 @@ def test_should_not_reset_producer_app( ] def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_with_dry_run_false( - self, mocker: MockerFixture, producer_app: ProducerApp + self, mocker: MockerFixture, producer_app: ProducerApp, ): mock_helm_upgrade_install = mocker.patch.object( - producer_app.helm, "upgrade_install" + producer_app.helm, "upgrade_install", ) mock_helm_uninstall = mocker.patch.object(producer_app.helm, "uninstall") diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index dce2c7e96..8cc46d538 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -47,7 +47,7 @@ def config(self) -> PipelineConfig: @pytest.fixture def streams_app( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: PipelineConfig, handlers: ComponentHandlers, ) -> StreamsApp: return StreamsApp( name=self.STREAMS_APP_NAME, @@ -61,9 +61,9 @@ def streams_app( "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), - } + }, }, }, ) @@ -91,7 +91,7 @@ def test_set_topics(self, config: PipelineConfig, handlers: ComponentHandlers): "type": "pattern", "role": "another-pattern", }, - } + }, }, }, ) @@ -102,7 +102,7 @@ def test_set_topics(self, config: PipelineConfig, handlers: ComponentHandlers): assert streams_app.app.streams.input_topics == ["example-input", "b", "a"] assert streams_app.app.streams.input_pattern == ".*" assert streams_app.app.streams.extra_input_patterns == { - "another-pattern": "example.*" + "another-pattern": "example.*", } helm_values = streams_app.to_helm_values() @@ -113,7 +113,7 @@ def test_set_topics(self, config: PipelineConfig, handlers: ComponentHandlers): assert "extraInputPatterns" in streams_config def test_no_empty_input_topic( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: PipelineConfig, handlers: ComponentHandlers, ): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, @@ -127,7 +127,7 @@ def test_no_empty_input_topic( "from": { "topics": { ".*": {"type": "pattern"}, - } + }, }, }, ) @@ -160,8 +160,8 @@ def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandle "topic-input": { "type": "input", "role": "role", - } - } + }, + }, }, }, ) @@ -182,14 +182,14 @@ def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandle "topic-input": { "type": "error", "role": "role", - } - } + }, + }, }, }, ) def test_set_streams_output_from_to( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: PipelineConfig, handlers: ComponentHandlers, ): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, @@ -203,10 +203,10 @@ def test_set_streams_output_from_to( "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), "${error_topic_name}": TopicConfig( - type=OutputTopicTypes.ERROR, partitions_count=10 + type=OutputTopicTypes.ERROR, partitions_count=10, ), "extra-topic-1": TopicConfig( role="first-extra-topic", @@ -216,7 +216,7 @@ def test_set_streams_output_from_to( role="second-extra-topic", partitions_count=10, ), - } + }, }, }, ) @@ -228,7 +228,7 @@ def test_set_streams_output_from_to( assert streams_app.app.streams.error_topic == "${error_topic_name}" def test_weave_inputs_from_prev_component( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: PipelineConfig, handlers: ComponentHandlers, ): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, @@ -246,19 +246,19 @@ def test_weave_inputs_from_prev_component( ToSection( topics={ TopicName("prev-output-topic"): TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), TopicName("b"): TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), TopicName("a"): TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), TopicName("prev-error-topic"): TopicConfig( - type=OutputTopicTypes.ERROR, partitions_count=10 + type=OutputTopicTypes.ERROR, partitions_count=10, ), - } - ) + }, + ), ) assert streams_app.app.streams.input_topics == ["prev-output-topic", "b", "a"] @@ -281,10 +281,10 @@ def test_deploy_order_when_dry_run_is_false( "to": { "topics": { "${output_topic_name}": TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 + type=OutputTopicTypes.OUTPUT, partitions_count=10, ), "${error_topic_name}": TopicConfig( - type=OutputTopicTypes.ERROR, partitions_count=10 + type=OutputTopicTypes.ERROR, partitions_count=10, ), "extra-topic-1": TopicConfig( role="first-extra-topic", @@ -294,15 +294,15 @@ def test_deploy_order_when_dry_run_is_false( role="second-extra-topic", partitions_count=10, ), - } + }, }, }, ) mock_create_topics = mocker.patch.object( - streams_app.handlers.topic_handler, "create_topics" + streams_app.handlers.topic_handler, "create_topics", ) mock_helm_upgrade_install = mocker.patch.object( - streams_app.helm, "upgrade_install" + streams_app.helm, "upgrade_install", ) mock = mocker.MagicMock() @@ -328,7 +328,7 @@ def test_deploy_order_when_dry_run_is_false( }, "outputTopic": "${output_topic_name}", "errorTopic": "${error_topic_name}", - } + }, }, HelmUpgradeInstallFlags( create_namespace=False, @@ -351,14 +351,14 @@ def test_destroy(self, streams_app: StreamsApp, mocker: MockerFixture): streams_app.destroy(dry_run=True) mock_helm_uninstall.assert_called_once_with( - "test-namespace", "${pipeline_name}-" + self.STREAMS_APP_NAME, True + "test-namespace", "${pipeline_name}-" + self.STREAMS_APP_NAME, True, ) def test_reset_when_dry_run_is_false( - self, streams_app: StreamsApp, mocker: MockerFixture + self, streams_app: StreamsApp, mocker: MockerFixture, ): mock_helm_upgrade_install = mocker.patch.object( - streams_app.helm, "upgrade_install" + streams_app.helm, "upgrade_install", ) mock_helm_uninstall = mocker.patch.object(streams_app.helm, "uninstall") @@ -402,7 +402,7 @@ def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean_up( mocker: MockerFixture, ): mock_helm_upgrade_install = mocker.patch.object( - streams_app.helm, "upgrade_install" + streams_app.helm, "upgrade_install", ) mock_helm_uninstall = mocker.patch.object(streams_app.helm, "uninstall") diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index cb58d19f0..0432a2184 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -51,12 +51,12 @@ def inflate(self) -> list[PipelineComponent]: to=ToSection( topics={ TopicName("${component_type}"): TopicConfig( - type=OutputTopicTypes.OUTPUT + type=OutputTopicTypes.OUTPUT, ), TopicName("${component_name}"): TopicConfig( - type=None, role="test" + type=None, role="test", ), - } + }, ), ) inflate_steps.append(kafka_connector) @@ -67,9 +67,9 @@ def inflate(self) -> list[PipelineComponent]: to=ToSection( # type: ignore topics={ TopicName( - f"{self.full_name}-" + "${component_name}" - ): TopicConfig(type=OutputTopicTypes.OUTPUT) - } + f"{self.full_name}-" + "${component_name}", + ): TopicConfig(type=OutputTopicTypes.OUTPUT), + }, ).dict(), ) inflate_steps.append(streams_app) @@ -79,7 +79,7 @@ def inflate(self) -> list[PipelineComponent]: class TestSchemaProvider(SchemaProvider): def provide_schema( - self, schema_class: str, models: dict[ModelName, ModelVersion] + self, schema_class: str, models: dict[ModelName, ModelVersion], ) -> Schema: schema = { "type": "record", diff --git a/tests/pipeline/test_components_without_schema_handler/components.py b/tests/pipeline/test_components_without_schema_handler/components.py index d5684178c..9ea414a9d 100644 --- a/tests/pipeline/test_components_without_schema_handler/components.py +++ b/tests/pipeline/test_components_without_schema_handler/components.py @@ -33,7 +33,7 @@ def inflate(self) -> list[PipelineComponent]: **{ "topics": topic_name, "transforms.changeTopic.replacement": f"{topic_name}-index-v1", - } + }, ), ) inflate_steps.append(kafka_connector) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index af9cde479..41208322b 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -49,7 +49,7 @@ def test_load_pipeline(self, snapshot: SnapshotTest): snapshot.assert_match(enriched_pipeline, "test-pipeline") def test_generate_with_steps_flag_should_write_log_warning( - self, caplog: pytest.LogCaptureFixture + self, caplog: pytest.LogCaptureFixture, ): result = runner.invoke( app, @@ -73,7 +73,7 @@ def test_generate_with_steps_flag_should_write_log_warning( logging.WARNING, "The following flags are considered only when `--template` is set: \n \ '--steps'", - ) + ), ] assert result.exit_code == 0 diff --git a/tests/utils/test_dict_ops.py b/tests/utils/test_dict_ops.py index 1ea410770..e9a02fe5b 100644 --- a/tests/utils/test_dict_ops.py +++ b/tests/utils/test_dict_ops.py @@ -70,7 +70,7 @@ class SimpleModel(BaseModel): }, }, problems=99, - ).json() + ).json(), ) existing_substitution = { "key1": "Everything", diff --git a/tests/utils/test_diff.py b/tests/utils/test_diff.py index f2ffeac88..81b66b2cd 100644 --- a/tests/utils/test_diff.py +++ b/tests/utils/test_diff.py @@ -186,7 +186,7 @@ def test_render_diff(d1: dict, d2: dict, ignore: set[str] | None, output: str | diff_type=DiffType.CHANGE, key="a.b", change=Change(old_value=1, new_value=2), - ) + ), ], ), ], diff --git a/tests/utils/test_environment.py b/tests/utils/test_environment.py index 09bbb75de..88e84707a 100644 --- a/tests/utils/test_environment.py +++ b/tests/utils/test_environment.py @@ -91,7 +91,7 @@ def test_windows_behaviour_keys_transformation(system, fake_environment_windows) @patch("platform.system") def test_windows_behaviour_keys_transformation_as_kwargs( - system, fake_environment_windows + system, fake_environment_windows, ): system.return_value = "Windows" environment = Environment(**fake_environment_windows)