diff --git a/src/cosl/rules.py b/src/cosl/rules.py index c421620..64de85e 100644 --- a/src/cosl/rules.py +++ b/src/cosl/rules.py @@ -74,7 +74,7 @@ - `juju_model_uuid` - `juju_application` """ # noqa: W505 - +import hashlib import logging import os import re @@ -242,33 +242,41 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]: return groups - def _from_str(self, yaml_str: str): - yaml.safe_load(textwrap.dedent(yaml_str)) - rule_str = yaml.safe_load(yaml_str) - root_path = Path() - file_path = Path() - # print(f"HERE: {self.rule_type} && {type(rule_str)} && {type(cast(SingleRuleFormat, rule_str))}") - # pprint(rule_str) - - if self._is_official_rule_format(cast(OfficialRuleFileFormat, rule_str)): - rule_str = cast(OfficialRuleFileFormat, rule_str) - groups = rule_str["groups"] - elif self._is_single_rule_format(cast(SingleRuleFormat, rule_str), self.rule_type): + def _from_str(self, yaml_str: str, *, group_name: Optional[str]=None, group_name_prefix: str) -> List[dict]: + """Process rules file (TODO: improve desc). + + Raises: + ValueError, when invalid rule format given. + """ + if not yaml_str: + raise ValueError("Empty") + if not isinstance(yaml_str, dict): + raise ValueError("Invalid rules (must be a dict)") + + if self._is_official_rule_format(cast(OfficialRuleFileFormat, yaml_str)): + yaml_str = cast(OfficialRuleFileFormat, yaml_str) + groups = yaml_str["groups"] + elif self._is_single_rule_format(cast(SingleRuleFormat, yaml_str), self.rule_type): # convert to list of groups # group name is made up from the file name - rule_str = cast(SingleRuleFormat, rule_str) - groups = [{"name": file_path.stem, "rules": [rule_str]}] + yaml_str = cast(SingleRuleFormat, yaml_str) + if not group_name: + # Note: the caller of this function should make sure this never happens: + # Either we use the standard format, or we'd pass a group_name. + # If/when we drop support for the single-rule-per-file format, this won't + # be needed anymore. + group_name = hashlib.shake_256(str(yaml_str).encode("utf-8")).hexdigest(20) + groups = [{"name": group_name, "rules": [yaml_str]}] else: # invalid/unsupported - logger.error("Invalid rules file: %s", file_path.name) - return [] + raise ValueError("Invalid rule format") # update rules with additional metadata groups = cast(List[OfficialRuleFileItem], groups) for group in groups: if not self._is_already_modified(group["name"]): # update group name with topology and sub-path - group["name"] = self._new_group_name(group["name"]) + group["name"] = "_".join(filter(None, [group_name_prefix, group_name, f"{self.rule_type}s"])) # add "juju_" topology labels for rule in group["rules"]: @@ -317,92 +325,23 @@ def _from_file( # noqa: C901 logger.error("Failed to read rules from %s: %s", file_path.name, e) return [] - if not rule_file: - logger.warning("Empty rules file: %s", file_path.name) - return [] - if not isinstance(rule_file, dict): - logger.error("Invalid rules file (must be a dict): %s", file_path.name) - return [] + # Generate group name prefix + # - name, from juju topology + # - suffix, from the relative path of the rule file; + rel_path = os.path.relpath(os.path.dirname(file_path), root_path) + rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_") + group_name_parts = [self.topology.identifier] if self.topology else [] + group_name_parts.append(rel_path) + group_name_prefix = "_".join(filter(None, group_name_parts)) - if self._is_official_rule_format(cast(OfficialRuleFileFormat, rule_file)): - rule_file = cast(OfficialRuleFileFormat, rule_file) - groups = rule_file["groups"] - elif self._is_single_rule_format(cast(SingleRuleFormat, rule_file), self.rule_type): - # convert to list of groups - # group name is made up from the file name - rule_file = cast(SingleRuleFormat, rule_file) - groups = [{"name": file_path.stem, "rules": [rule_file]}] - else: - # invalid/unsupported - logger.error("Invalid rules file: %s", file_path.name) + try: + groups = self._from_str(rule_file, group_name_prefix=group_name_prefix) + except ValueError as e: + logger.error("Invalid rules file: %s (%s)", file_path.name, e) return [] - # update rules with additional metadata - groups = cast(List[OfficialRuleFileItem], groups) - for group in groups: - if not self._is_already_modified(group["name"]): - # update group name with topology and sub-path - group["name"] = self._group_name(str(root_path), str(file_path), group["name"]) - - # add "juju_" topology labels - for rule in group["rules"]: - if "labels" not in rule: - rule["labels"] = {} - - if self.topology: - # only insert labels that do not already exist - for label, val in self.topology.label_matcher_dict.items(): - if label not in rule["labels"]: - rule["labels"][label] = val - - # insert juju topology filters into a prometheus rule - repl = r'job=~".+"' if self.query_type == "logql" else "" - rule["expr"] = self.tool.inject_label_matchers( # type: ignore - expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]), - topology={ - k: rule["labels"][k] - for k in ("juju_model", "juju_model_uuid", "juju_application") - if rule["labels"].get(k) is not None - }, - query_type=self.query_type, - ) - return groups - def _new_group_name(self, group_name: str) -> str: - # Generate group name: - # - name, from juju topology - # - suffix, from the relative path of the rule file; - group_name_parts = [self.topology.identifier] if self.topology else [] - group_name_parts.extend(["generic_rules", group_name, f"{self.rule_type}s"]) - # filter to remove empty strings - return "_".join(filter(None, group_name_parts)) - - def _group_name(self, root_path: str, file_path: str, group_name: str) -> str: - """Generate group name from path and topology. - - The group name is made up of the relative path between the root dir_path, the file path, - and topology identifier. - - Args: - root_path: path to the root rules dir. - file_path: path to rule file. - group_name: original group name to keep as part of the new augmented group name - - Returns: - New group name, augmented by juju topology and relative path. - """ - rel_path = os.path.relpath(os.path.dirname(file_path), root_path) - rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_") - - # Generate group name: - # - name, from juju topology - # - suffix, from the relative path of the rule file; - group_name_parts = [self.topology.identifier] if self.topology else [] - group_name_parts.extend([rel_path, group_name, f"{self.rule_type}s"]) - # filter to remove empty strings - return "_".join(filter(None, group_name_parts)) - def _is_already_modified(self, name: str) -> bool: """Detect whether a group name has already been modified with juju topology.""" modified_matcher = re.compile(r"^.*?_[\da-f]{8}_.*?alerts$")