Skip to content

Commit

Permalink
update naming from str to dict
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelThamm committed Dec 19, 2024
1 parent 300298a commit 7b458ea
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 37 deletions.
32 changes: 17 additions & 15 deletions src/cosl/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def _from_file( # noqa: C901
group_name_prefix = "_".join(filter(None, group_name_parts))

try:
groups = self._from_str(
groups = self._from_dict(
rule_file, group_name=file_path.stem, group_name_prefix=group_name_prefix
)
except ValueError as e:
Expand All @@ -279,38 +279,40 @@ def _from_file( # noqa: C901

return groups

def _from_str(
def _from_dict(
self,
yaml_str: Dict[str, Any],
yaml_dict: Dict[str, Any],
*,
group_name: Optional[str] = None,
group_name_prefix: Optional[str] = None,
) -> List[OfficialRuleFileItem]:
"""Process rules from string, injecting juju topology. If a single-rule format is provided, a hash of the yaml file is injected into the group name to ensure uniqueness.
"""Process rules from dict, injecting juju topology. If a single-rule format is provided, a hash of the yaml file is injected into the group name to ensure uniqueness.
Args:
yaml_str: rules content in single-rule or official-rule format as a YAML dict
yaml_dict: rules content in single-rule or official-rule format as a YAML dict
group_name: a custom identifier for the rule name to include in the group name
group_name_prefix: a custom group identifier to prefix the resulting group name, likely Juju topology and relative path context
Raises:
ValueError, when invalid rule format given.
"""
if not yaml_str:
# TODO _alerts_alerts is leftover code from elsewhere
# TODO rename this to be yaml_dict
if not yaml_dict:
raise ValueError("Empty")

if self._is_official_rule_format(yaml_str):
groups = yaml_str["groups"]
elif self._is_single_rule_format(yaml_str, self.rule_type):
if self._is_official_rule_format(yaml_dict):
groups = yaml_dict["groups"]
elif self._is_single_rule_format(yaml_dict, self.rule_type):
if not group_name:
# Note: the caller of this function should ensure this never happens:
# Either we use the standard format, or we'd pass a group_name.
# If/when we drop support for the single-rule-per-file format, this won't
# be needed anymore.
group_name = hashlib.shake_256(str(yaml_str).encode("utf-8")).hexdigest(10)
group_name = hashlib.shake_256(str(yaml_dict).encode("utf-8")).hexdigest(10)

# convert to list of groups to match official rule format
groups = [{"name": group_name, "rules": [yaml_str]}]
groups = [{"name": group_name, "rules": [yaml_dict]}]
else:
# invalid/unsupported
raise ValueError("Invalid rule format")
Expand Down Expand Up @@ -366,19 +368,19 @@ def _sanitize_metric_name(self, metric_name: str) -> str:

def add(
self,
yaml_str: Dict[str, Any],
yaml_dict: Dict[str, Any],
group_name: Optional[str] = None,
group_name_prefix: Optional[str] = None,
) -> None:
"""Add rules from a string to the existing ruleset.
"""Add rules from dict to the existing ruleset.
Args:
yaml_str: a single-rule or official-rule YAML dict
yaml_dict: a single-rule or official-rule YAML dict
group_name: a custom group name, used only if the new rule is of single-rule format
group_name_prefix: a custom group name prefix, used only if the new rule is of single-rule format
"""
self.groups.extend(
self._from_str(yaml_str, group_name=group_name, group_name_prefix=group_name_prefix)
self._from_dict(yaml_dict, group_name=group_name, group_name_prefix=group_name_prefix)
)

def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> None:
Expand Down
44 changes: 22 additions & 22 deletions tests/test_rules_promql.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def setUp(self):
"""
self.rule = yaml.safe_load(textwrap.dedent(rule))

def test_official_rule_add_alerts_from_string(self):
def test_official_rule_add_alerts_from_dict(self):
official_rule = """
groups:
- name: SomeGroupName
Expand All @@ -114,14 +114,14 @@ def test_official_rule_add_alerts_from_string(self):
}
# GIVEN an alert rule
rules = AlertRules(query_type="promql")
rules.groups = rules._from_str(self.rule, group_name="initial")
rules.groups = rules._from_dict(self.rule, group_name="initial")
# WHEN a rule is added from string in official-rule format
rules.add(yaml.safe_load(textwrap.dedent(official_rule)))
rules_dict = rules.as_dict()
# THEN the new rule is a combination of all
self.assertEqual({}, DeepDiff(expected_rules, rules_dict))

def test_single_rule_add_alerts_from_string(self):
def test_single_rule_add_alerts_from_dict(self):
single_rule = """
alert: BarRule
expr: up < 1
Expand All @@ -140,7 +140,7 @@ def test_single_rule_add_alerts_from_string(self):
}
# GIVEN an alert rule
rules = AlertRules(query_type="promql")
rules.groups = rules._from_str(self.rule, group_name="initial")
rules.groups = rules._from_dict(self.rule, group_name="initial")
# WHEN a rule is added from string in single-rule format with a custom group name and prefix
rules.add(
yaml.safe_load(textwrap.dedent(single_rule)),
Expand All @@ -153,7 +153,7 @@ def test_single_rule_add_alerts_from_string(self):
self.assertEqual({}, DeepDiff(expected_rules, rules_dict))


class TestFromStrGroupName(unittest.TestCase):
class TestFromDictGroupName(unittest.TestCase):
def setUp(self):
official_rule = """
groups:
Expand All @@ -175,11 +175,11 @@ def setUp(self):
self.official_rule = yaml.safe_load(textwrap.dedent(official_rule))
self.single_rule = yaml.safe_load(textwrap.dedent(single_rule))

def test_single_rule_from_string_group_sanitized(self):
def test_single_rule_from_dict_group_sanitized(self):
# GIVEN an alert rule in single-rule format
rules = AlertRules(query_type="promql")
# WHEN processed as string and provided an illegal custom group name
groups = rules._from_str(
groups = rules._from_dict(
self.single_rule, group_name="Foo$123/Hello:World(go_od)bye!@#^&*()[]{}|;:,.<>?`~_"
)
for group in groups:
Expand All @@ -189,61 +189,61 @@ def test_single_rule_from_string_group_sanitized(self):
group["name"], "Foo_123_Hello:World_go_od_bye______________:_________alerts"
)

def test_single_rule_from_string(self):
def test_single_rule_from_dict(self):
# GIVEN an alert rule in single-rule format
rules = AlertRules(query_type="promql")
# WHEN processed as string
groups = rules._from_str(self.single_rule)
groups = rules._from_dict(self.single_rule)
for group in groups:
# THEN group name contains hash
self.assertNotEqual(get_hash(group["name"]), "")

def test_single_rule_from_string_custom_group(self):
def test_single_rule_from_dict_custom_group(self):
# GIVEN an alert rule in single-rule format
rules = AlertRules(query_type="promql")
# WHEN processed as string and provided custom group name
groups = rules._from_str(self.single_rule, group_name="foo")
groups = rules._from_dict(self.single_rule, group_name="foo")
for group in groups:
# THEN group name does not contain hash
self.assertEqual(get_hash(group["name"]), "")
# AND group name contains the custom group name
self.assertIn("foo", group["name"])

def test_single_rule_from_string_custom_group_prefix(self):
def test_single_rule_from_dict_custom_group_prefix(self):
# GIVEN an alert rule in single-rule format
rules = AlertRules(query_type="promql")
# WHEN processed as string and provided custom group name prefix
groups = rules._from_str(self.single_rule, group_name_prefix="foo")
groups = rules._from_dict(self.single_rule, group_name_prefix="foo")
for group in groups:
# THEN group name does not include the original group name
self.assertNotIn("SomeGroupName", group["name"])
# AND group name is prefixed with custom value
self.assertTrue(group["name"].startswith("foo"))

def test_official_rule_from_string(self):
def test_official_rule_from_dict(self):
# GIVEN an alert rule in official-rule format
rules = AlertRules(query_type="promql")
# WHEN processed as string
groups = rules._from_str(self.official_rule)
# WHEN processed as dict
groups = rules._from_dict(self.official_rule)
for group in groups:
# THEN group name matches the group name in the alert
self.assertIn("SomeGroupName", group["name"])

def test_official_rule_from_string_custom_group_prefix(self):
def test_official_rule_from_dict_custom_group_prefix(self):
# GIVEN an alert rule in official-rule format
rules = AlertRules(query_type="promql")
# WHEN processed as string and provided custom group name prefix
groups = rules._from_str(self.official_rule, group_name_prefix="foo")
groups = rules._from_dict(self.official_rule, group_name_prefix="foo")
for group in groups:
# THEN group name includes the original group name
self.assertIn("SomeGroupName", group["name"])
# AND group name is prefixed with custom value
self.assertTrue(group["name"].startswith("foo"))

def test_raises_value_error_empty_str(self):
def test_raises_value_error_empty_dict(self):
rules = AlertRules(query_type="promql")
with self.assertRaises(ValueError) as ctx:
rules._from_str("")
rules._from_dict("")
self.assertEqual(str(ctx.exception), "Empty")

def test_raises_value_error_invalid_rule_format(self):
Expand All @@ -255,7 +255,7 @@ def test_raises_value_error_invalid_rule_format(self):
"""
rules = AlertRules(query_type="promql")
with self.assertRaises(ValueError) as ctx:
rules._from_str(yaml.safe_load(textwrap.dedent(invalid_rule)))
rules._from_dict(yaml.safe_load(textwrap.dedent(invalid_rule)))
self.assertEqual(str(ctx.exception), "Invalid rule format")


Expand Down Expand Up @@ -283,7 +283,7 @@ def expression_labels(expr):


def get_hash(group_name: str) -> str:
"""Extract the hash of the group name when a group name was not provided to _from_str method in the Rules class.
"""Extract the hash of the group name when a group name was not provided to _from_dict method in the Rules class.
This occurs when the single-rule format is provided rather than official-rule format.
Expand Down

0 comments on commit 7b458ea

Please sign in to comment.