Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Generic Alert Rules #115

Merged
merged 12 commits into from
Dec 20, 2024
190 changes: 113 additions & 77 deletions src/cosl/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
- `juju_application`
""" # noqa: W505

import hashlib
import logging
import os
import re
Expand All @@ -86,11 +87,9 @@

from . import CosTool, JujuTopology
from .types import (
OfficialRuleFileFormat,
OfficialRuleFileItem,
QueryType,
RuleType,
SingleRuleFormat,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -149,7 +148,7 @@ def __init__(self, query_type: QueryType, topology: Optional[JujuTopology] = Non
self.query_type = query_type
self.topology = topology
self.tool = CosTool(default_query_type=query_type)
self.groups = [] # type: List[Dict[str, Any]]
self.groups: List[OfficialRuleFileItem] = []

@property
@abstractmethod
Expand All @@ -160,7 +159,7 @@ def rule_type(self) -> RuleType:
# --- HELPER METHODS FOR READING FILES, SHOULD BE STATIC --- #

@staticmethod
def _is_official_rule_format(rules_dict: OfficialRuleFileFormat) -> bool:
def _is_official_rule_format(rules_dict: Dict[str, Any]) -> bool:
"""Are rules in the upstream format as supported by Prometheus or Loki.

Rules in dictionary format are in "official" form if they
Expand All @@ -176,7 +175,7 @@ def _is_official_rule_format(rules_dict: OfficialRuleFileFormat) -> bool:
return "groups" in rules_dict

@staticmethod
def _is_single_rule_format(rules_dict: SingleRuleFormat, rule_type: RuleType) -> bool:
def _is_single_rule_format(rules_dict: Dict[str, Any], rule_type: RuleType) -> bool:
"""Are alert rules in single rule format.

This library supports reading of rules in a custom format that
Expand Down Expand Up @@ -212,7 +211,7 @@ def _multi_suffix_glob(
all_files_in_dir = dir_path.glob("**/*" if recursive else "*")
return list(filter(lambda f: f.is_file() and f.suffix in suffixes, all_files_in_dir))

def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]:
def _from_dir(self, dir_path: Path, recursive: bool) -> List[OfficialRuleFileItem]:
"""Read all rule files in a directory.

All rules from files for the same directory are loaded into a single
Expand All @@ -227,7 +226,7 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]:
a list of dictionaries representing prometheus rule groups, each dictionary
representing a group (structure determined by `yaml.safe_load`).
"""
groups = [] # type: List[Dict[str, Any]]
groups: List[OfficialRuleFileItem] = []

# Gather all records into a list of groups
for file_path in Rules._multi_suffix_glob(
Expand All @@ -243,7 +242,7 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]:
def _from_file( # noqa: C901
self, root_path: Path, file_path: Path
) -> List[OfficialRuleFileItem]:
"""Read a rules file from path, injecting juju topology.
"""Read a rules file from path.

Args:
root_path: full path to the root rules folder (used only for generating group name)
Expand All @@ -262,82 +261,98 @@ def _from_file( # noqa: C901
logger.error("Failed to read rules from %s: %s", file_path.name, e)
return []

if not rule_file:
logger.warning("Empty rules file: %s", file_path.name)
return []
if not isinstance(rule_file, dict):
logger.error("Invalid rules file (must be a dict): %s", file_path.name)
return []
# Generate group name prefix
# - name, from juju topology
# - suffix, from the relative path of the rule file;
rel_path = os.path.relpath(os.path.dirname(file_path), root_path)
rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_")
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.append(rel_path)
group_name_prefix = "_".join(filter(None, group_name_parts))

if self._is_official_rule_format(cast(OfficialRuleFileFormat, rule_file)):
rule_file = cast(OfficialRuleFileFormat, rule_file)
groups = rule_file["groups"]
elif self._is_single_rule_format(cast(SingleRuleFormat, rule_file), self.rule_type):
# convert to list of groups
# group name is made up from the file name
rule_file = cast(SingleRuleFormat, rule_file)
groups = [{"name": file_path.stem, "rules": [rule_file]}]
else:
# invalid/unsupported
logger.error("Invalid rules file: %s", file_path.name)
try:
groups = self._from_str(
rule_file, group_name=file_path.stem, group_name_prefix=group_name_prefix
)
except ValueError as e:
logger.error("Invalid rules file: %s (%s)", file_path.name, e)
return []

# update rules with additional metadata
groups = cast(List[OfficialRuleFileItem], groups)
for group in groups:
if not self._is_already_modified(group["name"]):
# update group name with topology and sub-path
group["name"] = self._group_name(str(root_path), str(file_path), group["name"])

# add "juju_" topology labels
for rule in group["rules"]:
if "labels" not in rule:
rule["labels"] = {}

if self.topology:
# only insert labels that do not already exist
for label, val in self.topology.label_matcher_dict.items():
if label not in rule["labels"]:
rule["labels"][label] = val

# insert juju topology filters into a prometheus rule
repl = r'job=~".+"' if self.query_type == "logql" else ""
rule["expr"] = self.tool.inject_label_matchers( # type: ignore
expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]),
topology={
k: rule["labels"][k]
for k in ("juju_model", "juju_model_uuid", "juju_application")
if rule["labels"].get(k) is not None
},
query_type=self.query_type,
)

return groups

def _group_name(self, root_path: str, file_path: str, group_name: str) -> str:
"""Generate group name from path and topology.

The group name is made up of the relative path between the root dir_path, the file path,
and topology identifier.
def _from_str(
self,
yaml_str: Dict[str, Any],
*,
group_name: Optional[str] = None,
group_name_prefix: Optional[str] = None,
) -> List[OfficialRuleFileItem]:
"""Process rules from string, injecting juju topology. If a single-rule format is provided, a hash of the yaml file is injected into the group name to ensure uniqueness.
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved

Args:
root_path: path to the root rules dir.
file_path: path to rule file.
group_name: original group name to keep as part of the new augmented group name
yaml_str: rules content in single-rule or official-rule format as a YAML dict
group_name: a custom identifier for the rule name to include in the group name
group_name_prefix: a custom group identifier to prefix the resulting group name, likely Juju topology and relative path context

Returns:
New group name, augmented by juju topology and relative path.
Raises:
ValueError, when invalid rule format given.
"""
rel_path = os.path.relpath(os.path.dirname(file_path), root_path)
rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_")
if not yaml_str:
raise ValueError("Empty")

if self._is_official_rule_format(yaml_str):
yaml_str = yaml_str
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
groups = yaml_str["groups"]
elif self._is_single_rule_format(yaml_str, self.rule_type):
yaml_str = yaml_str
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
if not group_name:
# Note: the caller of this function should ensure this never happens:
# Either we use the standard format, or we'd pass a group_name.
# If/when we drop support for the single-rule-per-file format, this won't
# be needed anymore.
group_name = hashlib.shake_256(str(yaml_str).encode("utf-8")).hexdigest(10)
else:
group_name = self._sanitize_metric_name(group_name)
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved

# Generate group name:
# - name, from juju topology
# - suffix, from the relative path of the rule file;
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.extend([rel_path, group_name, f"{self.rule_type}s"])
# filter to remove empty strings
return "_".join(filter(None, group_name_parts))
# convert to list of groups to match official rule format
groups = [{"name": group_name, "rules": [yaml_str]}]
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
else:
# invalid/unsupported
raise ValueError("Invalid rule format")

# update rules with additional metadata
groups = cast(List[OfficialRuleFileItem], groups)
for group in groups:
if not self._is_already_modified(group["name"]):
# update group name with topology and sub-path
group["name"] = "_".join(
filter(None, [group_name_prefix, group["name"], f"{self.rule_type}s"])
)

# add "juju_" topology labels
for rule in group["rules"]:
if "labels" not in rule:
rule["labels"] = {}

if self.topology:
# only insert labels that do not already exist
for label, val in self.topology.label_matcher_dict.items():
if label not in rule["labels"]:
rule["labels"][label] = val

# insert juju topology filters into a prometheus rule
repl = r'job=~".+"' if self.query_type == "logql" else ""
rule["expr"] = self.tool.inject_label_matchers( # type: ignore
expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]),
topology={
k: rule["labels"][k]
for k in ("juju_model", "juju_model_uuid", "juju_application")
if rule["labels"].get(k) is not None
},
query_type=self.query_type,
)

return groups

def _is_already_modified(self, name: str) -> bool:
"""Detect whether a group name has already been modified with juju topology."""
Expand All @@ -346,8 +361,32 @@ def _is_already_modified(self, name: str) -> bool:
return False
return True

def _sanitize_metric_name(self, metric_name: str) -> str:
"""Sanitize a metric name according to https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels."""
return "".join(char if re.match(r"[a-zA-Z0-9_:]", char) else "_" for char in metric_name)

# ---- END STATIC HELPER METHODS --- #

def add(
self,
yaml_str: Dict[str, Any],
group_name: Optional[str] = None,
group_name_prefix: Optional[str] = None,
) -> None:
"""Add rules from a string to the existing ruleset.

Args:
yaml_str: a single-rule or official-rule YAML dict
group_name: a custom group name, used only if the new rule is of single-rule format
group_name_prefix: a custom group name prefix, used only if the new rule is of single-rule format
"""
kwargs: Dict[str, str] = {}
if group_name is not None:
kwargs["group_name"] = group_name
if group_name_prefix is not None:
kwargs["group_name_prefix"] = group_name_prefix
MichaelThamm marked this conversation as resolved.
Show resolved Hide resolved
self.groups.extend(self._from_str(yaml_str, **kwargs))

def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> None:
"""Add rules from a dir path.

Expand All @@ -357,9 +396,6 @@ def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> No
Args:
dir_path: either a rules file or a dir of rules files.
recursive: whether to read files recursively or not (no impact if `path` is a file).

Returns:
True if path was added else False.
"""
path = Path(dir_path) if isinstance(dir_path, str) else dir_path
if path.is_dir():
Expand Down
Loading
Loading