Skip to content

Commit

Permalink
Refactor "_from_file"
Browse files Browse the repository at this point in the history
  • Loading branch information
sed-i committed Dec 13, 2024
1 parent bbb5e08 commit 80a5664
Showing 1 changed file with 38 additions and 99 deletions.
137 changes: 38 additions & 99 deletions src/cosl/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
- `juju_model_uuid`
- `juju_application`
""" # noqa: W505

import hashlib
import logging
import os
import re
Expand Down Expand Up @@ -242,33 +242,41 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]:

return groups

def _from_str(self, yaml_str: str):
yaml.safe_load(textwrap.dedent(yaml_str))
rule_str = yaml.safe_load(yaml_str)
root_path = Path()
file_path = Path()
# print(f"HERE: {self.rule_type} && {type(rule_str)} && {type(cast(SingleRuleFormat, rule_str))}")
# pprint(rule_str)

if self._is_official_rule_format(cast(OfficialRuleFileFormat, rule_str)):
rule_str = cast(OfficialRuleFileFormat, rule_str)
groups = rule_str["groups"]
elif self._is_single_rule_format(cast(SingleRuleFormat, rule_str), self.rule_type):
def _from_str(self, yaml_str: str, *, group_name: Optional[str]=None, group_name_prefix: str) -> List[dict]:
"""Process rules file (TODO: improve desc).
Raises:
ValueError, when invalid rule format given.
"""
if not yaml_str:
raise ValueError("Empty")
if not isinstance(yaml_str, dict):
raise ValueError("Invalid rules (must be a dict)")

if self._is_official_rule_format(cast(OfficialRuleFileFormat, yaml_str)):
yaml_str = cast(OfficialRuleFileFormat, yaml_str)
groups = yaml_str["groups"]
elif self._is_single_rule_format(cast(SingleRuleFormat, yaml_str), self.rule_type):
# convert to list of groups
# group name is made up from the file name
rule_str = cast(SingleRuleFormat, rule_str)
groups = [{"name": file_path.stem, "rules": [rule_str]}]
yaml_str = cast(SingleRuleFormat, yaml_str)
if not group_name:
# Note: the caller of this function should make sure this never happens:
# Either we use the standard format, or we'd pass a group_name.
# If/when we drop support for the single-rule-per-file format, this won't
# be needed anymore.
group_name = hashlib.shake_256(str(yaml_str).encode("utf-8")).hexdigest(20)
groups = [{"name": group_name, "rules": [yaml_str]}]
else:
# invalid/unsupported
logger.error("Invalid rules file: %s", file_path.name)
return []
raise ValueError("Invalid rule format")

# update rules with additional metadata
groups = cast(List[OfficialRuleFileItem], groups)
for group in groups:
if not self._is_already_modified(group["name"]):
# update group name with topology and sub-path
group["name"] = self._new_group_name(group["name"])
group["name"] = "_".join(filter(None, [group_name_prefix, group_name, f"{self.rule_type}s"]))

# add "juju_" topology labels
for rule in group["rules"]:
Expand Down Expand Up @@ -317,92 +325,23 @@ def _from_file( # noqa: C901
logger.error("Failed to read rules from %s: %s", file_path.name, e)
return []

if not rule_file:
logger.warning("Empty rules file: %s", file_path.name)
return []
if not isinstance(rule_file, dict):
logger.error("Invalid rules file (must be a dict): %s", file_path.name)
return []
# Generate group name prefix
# - name, from juju topology
# - suffix, from the relative path of the rule file;
rel_path = os.path.relpath(os.path.dirname(file_path), root_path)
rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_")
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.append(rel_path)
group_name_prefix = "_".join(filter(None, group_name_parts))

if self._is_official_rule_format(cast(OfficialRuleFileFormat, rule_file)):
rule_file = cast(OfficialRuleFileFormat, rule_file)
groups = rule_file["groups"]
elif self._is_single_rule_format(cast(SingleRuleFormat, rule_file), self.rule_type):
# convert to list of groups
# group name is made up from the file name
rule_file = cast(SingleRuleFormat, rule_file)
groups = [{"name": file_path.stem, "rules": [rule_file]}]
else:
# invalid/unsupported
logger.error("Invalid rules file: %s", file_path.name)
try:
groups = self._from_str(rule_file, group_name_prefix=group_name_prefix)
except ValueError as e:
logger.error("Invalid rules file: %s (%s)", file_path.name, e)
return []

# update rules with additional metadata
groups = cast(List[OfficialRuleFileItem], groups)
for group in groups:
if not self._is_already_modified(group["name"]):
# update group name with topology and sub-path
group["name"] = self._group_name(str(root_path), str(file_path), group["name"])

# add "juju_" topology labels
for rule in group["rules"]:
if "labels" not in rule:
rule["labels"] = {}

if self.topology:
# only insert labels that do not already exist
for label, val in self.topology.label_matcher_dict.items():
if label not in rule["labels"]:
rule["labels"][label] = val

# insert juju topology filters into a prometheus rule
repl = r'job=~".+"' if self.query_type == "logql" else ""
rule["expr"] = self.tool.inject_label_matchers( # type: ignore
expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]),
topology={
k: rule["labels"][k]
for k in ("juju_model", "juju_model_uuid", "juju_application")
if rule["labels"].get(k) is not None
},
query_type=self.query_type,
)

return groups

def _new_group_name(self, group_name: str) -> str:
# Generate group name:
# - name, from juju topology
# - suffix, from the relative path of the rule file;
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.extend(["generic_rules", group_name, f"{self.rule_type}s"])
# filter to remove empty strings
return "_".join(filter(None, group_name_parts))

def _group_name(self, root_path: str, file_path: str, group_name: str) -> str:
"""Generate group name from path and topology.
The group name is made up of the relative path between the root dir_path, the file path,
and topology identifier.
Args:
root_path: path to the root rules dir.
file_path: path to rule file.
group_name: original group name to keep as part of the new augmented group name
Returns:
New group name, augmented by juju topology and relative path.
"""
rel_path = os.path.relpath(os.path.dirname(file_path), root_path)
rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_")

# Generate group name:
# - name, from juju topology
# - suffix, from the relative path of the rule file;
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.extend([rel_path, group_name, f"{self.rule_type}s"])
# filter to remove empty strings
return "_".join(filter(None, group_name_parts))

def _is_already_modified(self, name: str) -> bool:
"""Detect whether a group name has already been modified with juju topology."""
modified_matcher = re.compile(r"^.*?_[\da-f]{8}_.*?alerts$")
Expand Down

0 comments on commit 80a5664

Please sign in to comment.