diff --git a/src/cosl/rules.py b/src/cosl/rules.py index 820aac0..c421620 100644 --- a/src/cosl/rules.py +++ b/src/cosl/rules.py @@ -78,6 +78,8 @@ import logging import os import re +import textwrap +from pprint import pprint from abc import ABC, abstractmethod from pathlib import Path from typing import Any, Dict, List, Optional, Union, cast @@ -240,6 +242,59 @@ def _from_dir(self, dir_path: Path, recursive: bool) -> List[Dict[str, Any]]: return groups + def _from_str(self, yaml_str: str): + yaml.safe_load(textwrap.dedent(yaml_str)) + rule_str = yaml.safe_load(yaml_str) + root_path = Path() + file_path = Path() + # print(f"HERE: {self.rule_type} && {type(rule_str)} && {type(cast(SingleRuleFormat, rule_str))}") + # pprint(rule_str) + + if self._is_official_rule_format(cast(OfficialRuleFileFormat, rule_str)): + rule_str = cast(OfficialRuleFileFormat, rule_str) + groups = rule_str["groups"] + elif self._is_single_rule_format(cast(SingleRuleFormat, rule_str), self.rule_type): + # convert to list of groups + # group name is made up from the file name + rule_str = cast(SingleRuleFormat, rule_str) + groups = [{"name": file_path.stem, "rules": [rule_str]}] + else: + # invalid/unsupported + logger.error("Invalid rules file: %s", file_path.name) + return [] + + # update rules with additional metadata + groups = cast(List[OfficialRuleFileItem], groups) + for group in groups: + if not self._is_already_modified(group["name"]): + # update group name with topology and sub-path + group["name"] = self._new_group_name(group["name"]) + + # add "juju_" topology labels + for rule in group["rules"]: + if "labels" not in rule: + rule["labels"] = {} + + if self.topology: + # only insert labels that do not already exist + for label, val in self.topology.label_matcher_dict.items(): + if label not in rule["labels"]: + rule["labels"][label] = val + + # insert juju topology filters into a prometheus rule + repl = r'job=~".+"' if self.query_type == "logql" else "" + rule["expr"] = self.tool.inject_label_matchers( # type: ignore + expression=re.sub(r"%%juju_topology%%,?", repl, rule["expr"]), + topology={ + k: rule["labels"][k] + for k in ("juju_model", "juju_model_uuid", "juju_application") + if rule["labels"].get(k) is not None + }, + query_type=self.query_type, + ) + + return groups + def _from_file( # noqa: C901 self, root_path: Path, file_path: Path ) -> List[OfficialRuleFileItem]: @@ -314,6 +369,15 @@ def _from_file( # noqa: C901 return groups + def _new_group_name(self, group_name: str) -> str: + # Generate group name: + # - name, from juju topology + # - suffix, from the relative path of the rule file; + group_name_parts = [self.topology.identifier] if self.topology else [] + group_name_parts.extend(["generic_rules", group_name, f"{self.rule_type}s"]) + # filter to remove empty strings + return "_".join(filter(None, group_name_parts)) + def _group_name(self, root_path: str, file_path: str, group_name: str) -> str: """Generate group name from path and topology. @@ -348,6 +412,11 @@ def _is_already_modified(self, name: str) -> bool: # ---- END STATIC HELPER METHODS --- # + def add(self, yaml_str: str): + temp = self._from_str(yaml_str) + pprint(temp) + self.groups.extend(temp) + def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> None: """Add rules from a dir path. diff --git a/tests/test_rules_promql.py b/tests/test_rules_promql.py index afef8b8..be4c6a3 100644 --- a/tests/test_rules_promql.py +++ b/tests/test_rules_promql.py @@ -3,6 +3,7 @@ import json import re +import textwrap import unittest import uuid from pathlib import Path @@ -75,6 +76,48 @@ def test_each_alert_expression_is_topology_labeled(self): self.assertIn("juju_model_uuid", labels) self.assertIn("juju_application", labels) + def test_injecting_generic_alerts(self): + ri = AlertRules( + query_type="promql", + topology=JujuTopology( + model="unittest", + model_uuid=str(uuid.uuid4()), + unit="tester/0", + application="tester", + ), + ) + ri.add_path(Path(__file__).resolve().parent / "promql_rules" / "prometheus_alert_rules") + + # TODO SingleRuleFormat only require alert, expr, labels. Test others though + generic_alert_rules = """ + groups: + - name: GenericRules + rules: + - alert: HostDown + expr: up < 1 + for: 5m + labels: + severity: critical + - alert: HostUnavailable + expr: absent(up) + for: 5m + labels: + severity: critical + """ + + ri.add(generic_alert_rules) + + alerts = ri.as_dict() + self.assertIn("groups", alerts) + self.assertEqual(len(alerts["groups"]), 5) + group = alerts["groups"][0] + for rule in group["rules"]: + self.assertIn("expr", rule) + for labels in expression_labels(rule["expr"]): + self.assertIn("juju_model", labels) + self.assertIn("juju_model_uuid", labels) + self.assertIn("juju_application", labels) + def sorted_matchers(matchers) -> str: parts = [m.strip() for m in matchers.split(",")]