Skip to content

Commit

Permalink
test from file alerts
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelThamm committed Dec 13, 2024
1 parent da3eaff commit bbb5e08
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 0 deletions.
69 changes: 69 additions & 0 deletions src/cosl/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import logging
import os
import re
import textwrap
from pprint import pprint
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast
Expand Down Expand Up @@ -240,6 +242,59 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]:

return groups

def _from_str(self, yaml_str: str):
yaml.safe_load(textwrap.dedent(yaml_str))
rule_str = yaml.safe_load(yaml_str)
root_path = Path()
file_path = Path()
# print(f"HERE: {self.rule_type} && {type(rule_str)} && {type(cast(SingleRuleFormat, rule_str))}")
# pprint(rule_str)

if self._is_official_rule_format(cast(OfficialRuleFileFormat, rule_str)):
rule_str = cast(OfficialRuleFileFormat, rule_str)
groups = rule_str["groups"]
elif self._is_single_rule_format(cast(SingleRuleFormat, rule_str), self.rule_type):
# convert to list of groups
# group name is made up from the file name
rule_str = cast(SingleRuleFormat, rule_str)
groups = [{"name": file_path.stem, "rules": [rule_str]}]
else:
# invalid/unsupported
logger.error("Invalid rules file: %s", file_path.name)
return []

# update rules with additional metadata
groups = cast(List[OfficialRuleFileItem], groups)
for group in groups:
if not self._is_already_modified(group["name"]):
# update group name with topology and sub-path
group["name"] = self._new_group_name(group["name"])

# add "juju_" topology labels
for rule in group["rules"]:
if "labels" not in rule:
rule["labels"] = {}

if self.topology:
# only insert labels that do not already exist
for label, val in self.topology.label_matcher_dict.items():
if label not in rule["labels"]:
rule["labels"][label] = val

# insert juju topology filters into a prometheus rule
repl = r'job=~".+"' if self.query_type == "logql" else ""
rule["expr"] = self.tool.inject_label_matchers( # type: ignore
expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]),
topology={
k: rule["labels"][k]
for k in ("juju_model", "juju_model_uuid", "juju_application")
if rule["labels"].get(k) is not None
},
query_type=self.query_type,
)

return groups

def _from_file( # noqa: C901
self, root_path: Path, file_path: Path
) -> List[OfficialRuleFileItem]:
Expand Down Expand Up @@ -314,6 +369,15 @@ def _from_file( # noqa: C901

return groups

def _new_group_name(self, group_name: str) -> str:
# Generate group name:
# - name, from juju topology
# - suffix, from the relative path of the rule file;
group_name_parts = [self.topology.identifier] if self.topology else []
group_name_parts.extend(["generic_rules", group_name, f"{self.rule_type}s"])
# filter to remove empty strings
return "_".join(filter(None, group_name_parts))

def _group_name(self, root_path: str, file_path: str, group_name: str) -> str:
"""Generate group name from path and topology.
Expand Down Expand Up @@ -348,6 +412,11 @@ def _is_already_modified(self, name: str) -> bool:

# ---- END STATIC HELPER METHODS --- #

def add(self, yaml_str: str):
temp = self._from_str(yaml_str)
pprint(temp)
self.groups.extend(temp)

def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> None:
"""Add rules from a dir path.
Expand Down
43 changes: 43 additions & 0 deletions tests/test_rules_promql.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import json
import re
import textwrap
import unittest
import uuid
from pathlib import Path
Expand Down Expand Up @@ -75,6 +76,48 @@ def test_each_alert_expression_is_topology_labeled(self):
self.assertIn("juju_model_uuid", labels)
self.assertIn("juju_application", labels)

def test_injecting_generic_alerts(self):
ri = AlertRules(
query_type="promql",
topology=JujuTopology(
model="unittest",
model_uuid=str(uuid.uuid4()),
unit="tester/0",
application="tester",
),
)
ri.add_path(Path(__file__).resolve().parent / "promql_rules" / "prometheus_alert_rules")

# TODO SingleRuleFormat only require alert, expr, labels. Test others though
generic_alert_rules = """
groups:
- name: GenericRules
rules:
- alert: HostDown
expr: up < 1
for: 5m
labels:
severity: critical
- alert: HostUnavailable
expr: absent(up)
for: 5m
labels:
severity: critical
"""

ri.add(generic_alert_rules)

alerts = ri.as_dict()
self.assertIn("groups", alerts)
self.assertEqual(len(alerts["groups"]), 5)
group = alerts["groups"][0]
for rule in group["rules"]:
self.assertIn("expr", rule)
for labels in expression_labels(rule["expr"]):
self.assertIn("juju_model", labels)
self.assertIn("juju_model_uuid", labels)
self.assertIn("juju_application", labels)


def sorted_matchers(matchers) -> str:
parts = [m.strip() for m in matchers.split(",")]
Expand Down

0 comments on commit bbb5e08

Please sign in to comment.