Skip to content

Commit

Permalink
feature to add alerts from string to support generic alert rules
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelThamm committed Dec 17, 2024
1 parent 80a5664 commit dc03e07
Show file tree
Hide file tree
Showing 2 changed files with 290 additions and 95 deletions.
142 changes: 87 additions & 55 deletions src/cosl/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,11 @@
- `juju_model_uuid`
- `juju_application`
""" # noqa: W505

import hashlib
import logging
import os
import re
import textwrap
from pprint import pprint
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast
Expand Down Expand Up @@ -242,8 +241,60 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]:

return groups

def _from_str(self, yaml_str: str, *, group_name: Optional[str]=None, group_name_prefix: str) -> List[dict]:
"""Process rules file (TODO: improve desc).
def _from_file( # noqa: C901
self, root_path: Path, file_path: Path
) -> List[OfficialRuleFileItem]:
"""Read a rules file from path.
Args:
root_path: full path to the root rules folder (used only for generating group name)
file_path: full path to a *.rule file.
Returns:
A list of dictionaries representing the rules file, if file is valid (the structure is
formed by `yaml.safe_load` of the file); an empty list otherwise.
"""
with file_path.open() as rf:
# Load a list of rules from file then add labels and filters
try:
rule_file = yaml.safe_load(rf)

except Exception as e:
logger.error("Failed to read rules from %s: %s", file_path.name, e)
return []

# Generate group name prefix
# - name, from juju topology
# - suffix, from the relative path of the rule file;
rel_path = os.path.relpath(os.path.dirname(file_path), root_path)
rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_")
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.append(rel_path)
group_name_prefix = "_".join(filter(None, group_name_parts))

try:
groups = self._from_str(
rule_file, group_name=file_path.stem, group_name_prefix=group_name_prefix
)
except ValueError as e:
logger.error("Invalid rules file: %s (%s)", file_path.name, e)
return []

return groups

def _from_str(
self,
yaml_str: str,
*,
group_name: Optional[str] = None,
group_name_prefix: Optional[str] = None,
) -> List[OfficialRuleFileItem]:
"""Process rules from string, injecting juju topology. If a single-rule format is provided, a hash of the yaml file is injected into the group name to ensure uniqueness.
Args:
yaml_str: rules content in single-rule or official-rule format as a string
group_name: a custom identifier for the rule name to include in the group name
group_name_prefix: a custom group identifier to prefix the resulting group name, likely Juju topology and relative path context
Raises:
ValueError, when invalid rule format given.
Expand All @@ -257,15 +308,17 @@ def _from_str(self, yaml_str: str, *, group_name: Optional[str]=None, group_name
yaml_str = cast(OfficialRuleFileFormat, yaml_str)
groups = yaml_str["groups"]
elif self._is_single_rule_format(cast(SingleRuleFormat, yaml_str), self.rule_type):
# convert to list of groups
# group name is made up from the file name
yaml_str = cast(SingleRuleFormat, yaml_str)
if not group_name:
# Note: the caller of this function should make sure this never happens:
# Note: the caller of this function should ensure this never happens:
# Either we use the standard format, or we'd pass a group_name.
# If/when we drop support for the single-rule-per-file format, this won't
# be needed anymore.
group_name = hashlib.shake_256(str(yaml_str).encode("utf-8")).hexdigest(20)
group_name = hashlib.shake_256(str(yaml_str).encode("utf-8")).hexdigest(10)
else:
group_name = self._sanitize_metric_name(group_name)

# convert to list of groups to match official rule format
groups = [{"name": group_name, "rules": [yaml_str]}]
else:
# invalid/unsupported
Expand All @@ -276,7 +329,9 @@ def _from_str(self, yaml_str: str, *, group_name: Optional[str]=None, group_name
for group in groups:
if not self._is_already_modified(group["name"]):
# update group name with topology and sub-path
group["name"] = "_".join(filter(None, [group_name_prefix, group_name, f"{self.rule_type}s"]))
group["name"] = "_".join(
filter(None, [group_name_prefix, group["name"], f"{self.rule_type}s"])
)

# add "juju_" topology labels
for rule in group["rules"]:
Expand All @@ -303,58 +358,38 @@ def _from_str(self, yaml_str: str, *, group_name: Optional[str]=None, group_name

return groups

def _from_file( # noqa: C901
self, root_path: Path, file_path: Path
) -> List[OfficialRuleFileItem]:
"""Read a rules file from path, injecting juju topology.
Args:
root_path: full path to the root rules folder (used only for generating group name)
file_path: full path to a *.rule file.
Returns:
A list of dictionaries representing the rules file, if file is valid (the structure is
formed by `yaml.safe_load` of the file); an empty list otherwise.
"""
with file_path.open() as rf:
# Load a list of rules from file then add labels and filters
try:
rule_file = yaml.safe_load(rf)

except Exception as e:
logger.error("Failed to read rules from %s: %s", file_path.name, e)
return []

# Generate group name prefix
# - name, from juju topology
# - suffix, from the relative path of the rule file;
rel_path = os.path.relpath(os.path.dirname(file_path), root_path)
rel_path = "" if rel_path == "." else rel_path.replace(os.path.sep, "_")
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.append(rel_path)
group_name_prefix = "_".join(filter(None, group_name_parts))

try:
groups = self._from_str(rule_file, group_name_prefix=group_name_prefix)
except ValueError as e:
logger.error("Invalid rules file: %s (%s)", file_path.name, e)
return []

return groups

def _is_already_modified(self, name: str) -> bool:
"""Detect whether a group name has already been modified with juju topology."""
modified_matcher = re.compile(r"^.*?_[\da-f]{8}_.*?alerts$")
if modified_matcher.match(name) is None:
return False
return True

def _sanitize_metric_name(self, metric_name: str) -> str:
"""Sanitize a metric name according to https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels."""
return "".join(char if re.match(r"[a-zA-Z0-9_:]", char) else "_" for char in metric_name)

# ---- END STATIC HELPER METHODS --- #

def add(self, yaml_str: str):
temp = self._from_str(yaml_str)
pprint(temp)
self.groups.extend(temp)
def add(
self,
yaml_str: str,
group_name: Optional[str] = None,
group_name_prefix: Optional[str] = None,
) -> None:
"""Add rules from a string to the existing ruleset.
Args:
yaml_str: a single-rule or official-rule YAML string
group_name: a custom group name, used only if the new rule is of single-rule format
group_name_prefix: a custom group name prefix, used only if the new rule is of single-rule format
"""
kwargs = {}
if group_name is not None:
kwargs["group_name"] = group_name
if group_name_prefix is not None:
kwargs["group_name_prefix"] = group_name_prefix
self.groups.extend(self._from_str(yaml_str, **kwargs))

def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> None:
"""Add rules from a dir path.
Expand All @@ -365,9 +400,6 @@ def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> No
Args:
dir_path: either a rules file or a dir of rules files.
recursive: whether to read files recursively or not (no impact if `path` is a file).
Returns:
True if path was added else False.
"""
path = Path(dir_path) if isinstance(dir_path, str) else dir_path
if path.is_dir():
Expand Down
Loading

0 comments on commit dc03e07

Please sign in to comment.