diff --git a/kpops/cli/registry.py b/kpops/cli/registry.py index fc40f8938..30a9c1205 100644 --- a/kpops/cli/registry.py +++ b/kpops/cli/registry.py @@ -38,8 +38,9 @@ def __getitem__(self, component_type: str) -> type[PipelineComponent]: try: return self._classes[component_type] except KeyError as ke: + msg = f"Could not find a component of type {component_type}" raise ClassNotFoundError( - f"Could not find a component of type {component_type}", + msg, ) from ke diff --git a/kpops/component_handlers/helm_wrapper/helm.py b/kpops/component_handlers/helm_wrapper/helm.py index 0eb519cb6..f717f2e6d 100644 --- a/kpops/component_handlers/helm_wrapper/helm.py +++ b/kpops/component_handlers/helm_wrapper/helm.py @@ -29,8 +29,9 @@ def __init__(self, helm_config: HelmConfig) -> None: self._debug = helm_config.debug self._version = self.get_version() if self._version.major != 3: + msg = f"The supported Helm version is 3.x.x. The current Helm version is {self._version.major}.{self._version.minor}.{self._version.patch}" raise RuntimeError( - f"The supported Helm version is 3.x.x. The current Helm version is {self._version.major}.{self._version.minor}.{self._version.patch}", + msg, ) def add_repo( @@ -183,8 +184,9 @@ def get_version(self) -> Version: short_version = self.__execute(command) version_match = re.search(r"^v(\d+(?:\.\d+){0,2})", short_version) if version_match is None: + msg = f"Could not parse the Helm version.\n\nHelm output:\n{short_version}" raise RuntimeError( - f"Could not parse the Helm version.\n\nHelm output:\n{short_version}", + msg, ) version = map(int, version_match.group(1).split(".")) return Version(*version) diff --git a/kpops/component_handlers/helm_wrapper/model.py b/kpops/component_handlers/helm_wrapper/model.py index 93e0116e2..8c6c09c32 100644 --- a/kpops/component_handlers/helm_wrapper/model.py +++ b/kpops/component_handlers/helm_wrapper/model.py @@ -180,7 +180,8 @@ def parse_source(source: str) -> str: # Source: chart/templates/serviceaccount.yaml """ if not source.startswith(HELM_SOURCE_PREFIX): - raise ParseError("Not a valid Helm template source") + msg = "Not a valid Helm template source" + raise ParseError(msg) return source.removeprefix(HELM_SOURCE_PREFIX).strip() @classmethod diff --git a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py index fb81aa411..27aad212f 100644 --- a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py +++ b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py @@ -114,8 +114,9 @@ def __dry_run_connector_creation( errors = self._connect_wrapper.validate_connector_config(connector_config) if len(errors) > 0: formatted_errors = "\n".join(errors) + msg = f"Connector Creation: validating the connector config for connector {connector_name} resulted in the following errors: {formatted_errors}" raise ConnectorStateException( - f"Connector Creation: validating the connector config for connector {connector_name} resulted in the following errors: {formatted_errors}", + msg, ) else: log.info( diff --git a/kpops/component_handlers/kafka_connect/model.py b/kpops/component_handlers/kafka_connect/model.py index 99964d3c5..e83e33e5d 100644 --- a/kpops/component_handlers/kafka_connect/model.py +++ b/kpops/component_handlers/kafka_connect/model.py @@ -31,7 +31,8 @@ def schema_extra(cls, schema: dict[str, Any], model: type[BaseModel]) -> None: @validator("connector_class") def connector_class_must_contain_dot(cls, connector_class: str) -> str: if "." not in connector_class: - raise ValueError(f"Invalid connector class {connector_class}") + msg = f"Invalid connector class {connector_class}" + raise ValueError(msg) return connector_class @property diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index 8c2065f7a..9bf068438 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -29,15 +29,16 @@ def __init__(self, url: str, components_module: str | None): def schema_provider(self) -> SchemaProvider: try: if not self.components_module: + msg = f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists." raise ValueError( - f"The Schema Registry URL is set but you haven't specified the component module path. Please provide a valid component module path where your {SchemaProvider.__name__} implementation exists.", + msg, ) schema_provider_class = find_class(self.components_module, SchemaProvider) return schema_provider_class() # pyright: ignore[reportGeneralTypeIssues] except ClassNotFoundError as e: + msg = f"No schema provider found in components module {self.components_module}. Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}." raise ValueError( - f"No schema provider found in components module {self.components_module}. " - f"Please implement the abstract method in {SchemaProvider.__module__}.{SchemaProvider.__name__}.", + msg, ) from e @classmethod @@ -144,8 +145,9 @@ def __check_compatibility( if isinstance(schema, AvroSchema) else str(schema) ) + msg = f"Schema is not compatible for {subject} and model {schema_class}. \n {json.dumps(schema_str, indent=4)}" raise Exception( - f"Schema is not compatible for {subject} and model {schema_class}. \n {json.dumps(schema_str, indent=4)}", + msg, ) else: log.debug( diff --git a/kpops/component_handlers/topic/handler.py b/kpops/component_handlers/topic/handler.py index b436b20e3..38de9c66b 100644 --- a/kpops/component_handlers/topic/handler.py +++ b/kpops/component_handlers/topic/handler.py @@ -148,8 +148,9 @@ def __check_partition_count( f"Topic Creation: partition count of topic {topic_name} did not change. Current partitions count {partition_count}. Updating configs.", ) else: + msg = f"Topic Creation: partition count of topic {topic_name} changed! Partitions count of topic {topic_name} is {partition_count}. The given partitions count {topic_spec.partitions_count}." raise TopicTransactionError( - f"Topic Creation: partition count of topic {topic_name} changed! Partitions count of topic {topic_name} is {partition_count}. The given partitions count {topic_spec.partitions_count}.", + msg, ) @staticmethod @@ -168,8 +169,9 @@ def __check_replication_factor( f"Topic Creation: replication factor of topic {topic_name} did not change. Current replication factor {replication_factor}. Updating configs.", ) else: + msg = f"Topic Creation: replication factor of topic {topic_name} changed! Replication factor of topic {topic_name} is {replication_factor}. The given replication count {topic_spec.replication_factor}." raise TopicTransactionError( - f"Topic Creation: replication factor of topic {topic_name} changed! Replication factor of topic {topic_name} is {replication_factor}. The given replication count {topic_spec.replication_factor}.", + msg, ) def __dry_run_topic_deletion(self, topic_name: str) -> None: diff --git a/kpops/component_handlers/topic/proxy_wrapper.py b/kpops/component_handlers/topic/proxy_wrapper.py index 1a36e4b50..0d7ad2ebb 100644 --- a/kpops/component_handlers/topic/proxy_wrapper.py +++ b/kpops/component_handlers/topic/proxy_wrapper.py @@ -25,8 +25,9 @@ class ProxyWrapper: def __init__(self, pipeline_config: PipelineConfig) -> None: if not pipeline_config.kafka_rest_host: + msg = "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." raise ValueError( - "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST.", + msg, ) self._host = pipeline_config.kafka_rest_host diff --git a/kpops/components/base_components/base_defaults_component.py b/kpops/components/base_components/base_defaults_component.py index e789182ff..4cae46ce4 100644 --- a/kpops/components/base_components/base_defaults_component.py +++ b/kpops/components/base_components/base_defaults_component.py @@ -165,8 +165,9 @@ def defaults_from_yaml(path: Path, key: str) -> dict: """ content = load_yaml_file(path, substitution=ENV) if not isinstance(content, dict): + msg = "Default files should be structured as map ([app type] -> [default config]" raise TypeError( - "Default files should be structured as map ([app type] -> [default config]", + msg, ) value = content.get(key) if value is None: diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 2d6c8c8bc..584096990 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -85,7 +85,8 @@ def connector_config_should_have_component_name( component_name = values["prefix"] + values["name"] connector_name: str | None = app.get("name") if connector_name is not None and connector_name != component_name: - raise ValueError("Connector name should be the same as component name") + msg = "Connector name should be the same as component name" + raise ValueError(msg) app["name"] = component_name return app @@ -280,7 +281,8 @@ class KafkaSourceConnector(KafkaConnector): @override def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: - raise NotImplementedError("Kafka source connector doesn't support FromSection") + msg = "Kafka source connector doesn't support FromSection" + raise NotImplementedError(msg) @override def template(self) -> None: diff --git a/kpops/components/base_components/kubernetes_app.py b/kpops/components/base_components/kubernetes_app.py index b2676da7c..021f6d379 100644 --- a/kpops/components/base_components/kubernetes_app.py +++ b/kpops/components/base_components/kubernetes_app.py @@ -96,8 +96,9 @@ def helm_release_name(self) -> str: @property def helm_chart(self) -> str: """Return component's Helm chart.""" + msg = f"Please implement the helm_chart property of the {self.__module__} module." raise NotImplementedError( - f"Please implement the helm_chart property of the {self.__module__} module.", + msg, ) @property @@ -193,7 +194,8 @@ def validate_kubernetes_name(name: str) -> None: :raises ValueError: The component name {name} is invalid for Kubernetes. """ if not bool(KUBERNETES_NAME_CHECK_PATTERN.match(name)): - raise ValueError(f"The component name {name} is invalid for Kubernetes.") + msg = f"The component name {name} is invalid for Kubernetes." + raise ValueError(msg) @override def dict(self, *, exclude=None, **kwargs) -> dict[str, Any]: diff --git a/kpops/components/base_components/models/from_section.py b/kpops/components/base_components/models/from_section.py index aea159eb2..a53708c94 100644 --- a/kpops/components/base_components/models/from_section.py +++ b/kpops/components/base_components/models/from_section.py @@ -39,7 +39,8 @@ class Config(DescConfig): def extra_topic_role(cls, values: dict[str, Any]) -> dict[str, Any]: """Ensure that cls.role is used correctly, assign type if needed.""" if values["type"] == InputTopicTypes.INPUT and values["role"]: - raise ValueError("Define role only if `type` is `pattern` or `None`") + msg = "Define role only if `type` is `pattern` or `None`" + raise ValueError(msg) return values diff --git a/kpops/components/base_components/models/to_section.py b/kpops/components/base_components/models/to_section.py index 00393ee4e..843321916 100644 --- a/kpops/components/base_components/models/to_section.py +++ b/kpops/components/base_components/models/to_section.py @@ -67,7 +67,8 @@ class Config(DescConfig): def extra_topic_role(cls, values: dict[str, Any]) -> dict[str, Any]: """Ensure that cls.role is used correctly, assign type if needed.""" if values["type"] and values["role"]: - raise ValueError("Define `role` only if `type` is undefined") + msg = "Define `role` only if `type` is undefined" + raise ValueError(msg) return values diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 3a513c5a5..08e621019 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -40,7 +40,8 @@ class ProducerApp(KafkaApp): def apply_to_outputs(self, name: str, topic: TopicConfig) -> None: match topic.type: case OutputTopicTypes.ERROR: - raise ValueError("Producer apps do not support error topics") + msg = "Producer apps do not support error topics" + raise ValueError(msg) case _: super().apply_to_outputs(name, topic) diff --git a/kpops/pipeline_generator/pipeline.py b/kpops/pipeline_generator/pipeline.py index 0ed15e9b1..b11e933fb 100644 --- a/kpops/pipeline_generator/pipeline.py +++ b/kpops/pipeline_generator/pipeline.py @@ -44,7 +44,8 @@ def find(self, component_name: str) -> PipelineComponent: for component in self.components: if component_name == component.name: return component - raise ValueError(f"Component {component_name} not found") + msg = f"Component {component_name} not found" + raise ValueError(msg) def add(self, component: PipelineComponent) -> None: self._populate_component_name(component) @@ -63,8 +64,9 @@ def validate_unique_names(self) -> None: step_names = [component.full_name for component in self.components] duplicates = [name for name, count in Counter(step_names).items() if count > 1] if duplicates: + msg = f"step names should be unique. duplicate step names: {', '.join(duplicates)}" raise ValidationError( - f"step names should be unique. duplicate step names: {', '.join(duplicates)}", + msg, ) @staticmethod @@ -87,8 +89,9 @@ def create_env_components_index( index: dict[str, dict] = {} for component in environment_components: if "type" not in component or "name" not in component: + msg = "To override components per environment, every component should at least have a type and a name." raise ValueError( - "To override components per environment, every component should at least have a type and a name.", + msg, ) index[component["name"]] = component return index @@ -137,15 +140,17 @@ def load_from_yaml( main_content = load_yaml_file(path, substitution=ENV) if not isinstance(main_content, list): + msg = f"The pipeline definition {path} should contain a list of components" raise TypeError( - f"The pipeline definition {path} should contain a list of components", + msg, ) env_content = [] if (env_file := Pipeline.pipeline_filename_environment(path, config)).exists(): env_content = load_yaml_file(env_file, substitution=ENV) if not isinstance(env_content, list): + msg = f"The pipeline definition {env_file} should contain a list of components" raise TypeError( - f"The pipeline definition {env_file} should contain a list of components", + msg, ) pipeline = cls(main_content, env_content, registry, config, handlers) @@ -164,15 +169,17 @@ def parse_components(self, component_list: list[dict]) -> None: try: component_type: str = component_data["type"] except KeyError as ke: + msg = "Every component must have a type defined, this component does not have one." raise ValueError( - "Every component must have a type defined, this component does not have one.", + msg, ) from ke component_class = self.registry[component_type] self.apply_component(component_class, component_data) except Exception as ex: # noqa: BLE001 if "name" in component_data: + msg = f"Error enriching {component_data['type']} component {component_data['name']}" raise ParsingException( - f"Error enriching {component_data['type']} component {component_data['name']}", + msg, ) from ex else: raise ParsingException() from ex @@ -336,7 +343,8 @@ def set_pipeline_name_env_vars(base_dir: Path, path: Path) -> None: """ path_without_file = path.resolve().relative_to(base_dir.resolve()).parts[:-1] if not path_without_file: - raise ValueError("The pipeline-base-dir should not equal the pipeline-path") + msg = "The pipeline-base-dir should not equal the pipeline-path" + raise ValueError(msg) pipeline_name = "-".join(path_without_file) ENV["pipeline_name"] = pipeline_name for level, parent in enumerate(path_without_file): diff --git a/kpops/utils/dict_differ.py b/kpops/utils/dict_differ.py index da47bd620..005cd1e71 100644 --- a/kpops/utils/dict_differ.py +++ b/kpops/utils/dict_differ.py @@ -40,7 +40,8 @@ def factory(type: DiffType, change: T | tuple[T, T]) -> Change: return Change(change, None) case DiffType.CHANGE if isinstance(change, tuple): return Change(*change) - raise ValueError(f"{type} is not part of {DiffType}") + msg = f"{type} is not part of {DiffType}" + raise ValueError(msg) @dataclass diff --git a/kpops/utils/dict_ops.py b/kpops/utils/dict_ops.py index 0f4643043..c52ae8e42 100644 --- a/kpops/utils/dict_ops.py +++ b/kpops/utils/dict_ops.py @@ -57,11 +57,13 @@ def flatten_mapping( :returns: "Flattened" mapping in the form of dict """ if not isinstance(nested_mapping, Mapping): - raise TypeError("Argument nested_mapping is not a Mapping") + msg = "Argument nested_mapping is not a Mapping" + raise TypeError(msg) top: dict[str, Any] = {} for key, value in nested_mapping.items(): if not isinstance(key, str): - raise TypeError(f"Argument nested_mapping contains a non-str key: {key}") + msg = f"Argument nested_mapping contains a non-str key: {key}" + raise TypeError(msg) if prefix: key = prefix + separator + key if isinstance(value, Mapping): diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index cbcf4beaa..e04259403 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -26,7 +26,8 @@ class SchemaScope(str, Enum): # adapted from https://github.com/tiangolo/fastapi/issues/1378#issuecomment-764966955 def field_schema(field: ModelField, **kwargs: Any) -> Any: if field.field_info.extra.get("hidden_from_schema"): - raise SkipField(f"{field.name} field is being hidden") + msg = f"{field.name} field is being hidden" + raise SkipField(msg) else: return original_field_schema(field, **kwargs) @@ -102,7 +103,8 @@ def gen_pipeline_schema( if components_module: components = _add_components(components_module, components) if not components: - raise RuntimeError("No valid components found.") + msg = "No valid components found." + raise RuntimeError(msg) # Create a type union that will hold the union of all component types PipelineComponents = Union[components] # type: ignore[valid-type] diff --git a/kpops/utils/yaml_loading.py b/kpops/utils/yaml_loading.py index 9dc53c1ab..d8aee8b95 100644 --- a/kpops/utils/yaml_loading.py +++ b/kpops/utils/yaml_loading.py @@ -70,7 +70,8 @@ def substitute_nested(input: str, **kwargs) -> str: steps.add(new_str) old_str, new_str = new_str, substitute(new_str, kwargs) if new_str != old_str: + msg = "An infinite loop condition detected. Check substitution variables." raise ValueError( - "An infinite loop condition detected. Check substitution variables.", + msg, ) return old_str