diff --git a/src/cosl/rules.py b/src/cosl/rules.py index e7547bb..2a3aef1 100644 --- a/src/cosl/rules.py +++ b/src/cosl/rules.py @@ -270,7 +270,7 @@ def _from_file( # noqa: C901 group_name_prefix = "_".join(filter(None, group_name_parts)) try: - groups = self._from_str( + groups = self._from_dict( rule_file, group_name=file_path.stem, group_name_prefix=group_name_prefix ) except ValueError as e: @@ -279,38 +279,40 @@ def _from_file( # noqa: C901 return groups - def _from_str( + def _from_dict( self, - yaml_str: Dict[str, Any], + yaml_dict: Dict[str, Any], *, group_name: Optional[str] = None, group_name_prefix: Optional[str] = None, ) -> List[OfficialRuleFileItem]: - """Process rules from string, injecting juju topology. If a single-rule format is provided, a hash of the yaml file is injected into the group name to ensure uniqueness. + """Process rules from dict, injecting juju topology. If a single-rule format is provided, a hash of the yaml file is injected into the group name to ensure uniqueness. Args: - yaml_str: rules content in single-rule or official-rule format as a YAML dict + yaml_dict: rules content in single-rule or official-rule format as a YAML dict group_name: a custom identifier for the rule name to include in the group name group_name_prefix: a custom group identifier to prefix the resulting group name, likely Juju topology and relative path context Raises: ValueError, when invalid rule format given. """ - if not yaml_str: + # TODO _alerts_alerts is leftover code from elsewhere + # TODO rename this to be yaml_dict + if not yaml_dict: raise ValueError("Empty") - if self._is_official_rule_format(yaml_str): - groups = yaml_str["groups"] - elif self._is_single_rule_format(yaml_str, self.rule_type): + if self._is_official_rule_format(yaml_dict): + groups = yaml_dict["groups"] + elif self._is_single_rule_format(yaml_dict, self.rule_type): if not group_name: # Note: the caller of this function should ensure this never happens: # Either we use the standard format, or we'd pass a group_name. # If/when we drop support for the single-rule-per-file format, this won't # be needed anymore. - group_name = hashlib.shake_256(str(yaml_str).encode("utf-8")).hexdigest(10) + group_name = hashlib.shake_256(str(yaml_dict).encode("utf-8")).hexdigest(10) # convert to list of groups to match official rule format - groups = [{"name": group_name, "rules": [yaml_str]}] + groups = [{"name": group_name, "rules": [yaml_dict]}] else: # invalid/unsupported raise ValueError("Invalid rule format") @@ -366,19 +368,19 @@ def _sanitize_metric_name(self, metric_name: str) -> str: def add( self, - yaml_str: Dict[str, Any], + yaml_dict: Dict[str, Any], group_name: Optional[str] = None, group_name_prefix: Optional[str] = None, ) -> None: - """Add rules from a string to the existing ruleset. + """Add rules from dict to the existing ruleset. Args: - yaml_str: a single-rule or official-rule YAML dict + yaml_dict: a single-rule or official-rule YAML dict group_name: a custom group name, used only if the new rule is of single-rule format group_name_prefix: a custom group name prefix, used only if the new rule is of single-rule format """ self.groups.extend( - self._from_str(yaml_str, group_name=group_name, group_name_prefix=group_name_prefix) + self._from_dict(yaml_dict, group_name=group_name, group_name_prefix=group_name_prefix) ) def add_path(self, dir_path: Union[str, Path], *, recursive: bool = False) -> None: diff --git a/tests/test_rules_promql.py b/tests/test_rules_promql.py index 675a175..a018370 100644 --- a/tests/test_rules_promql.py +++ b/tests/test_rules_promql.py @@ -88,7 +88,7 @@ def setUp(self): """ self.rule = yaml.safe_load(textwrap.dedent(rule)) - def test_official_rule_add_alerts_from_string(self): + def test_official_rule_add_alerts_from_dict(self): official_rule = """ groups: - name: SomeGroupName @@ -114,14 +114,14 @@ def test_official_rule_add_alerts_from_string(self): } # GIVEN an alert rule rules = AlertRules(query_type="promql") - rules.groups = rules._from_str(self.rule, group_name="initial") + rules.groups = rules._from_dict(self.rule, group_name="initial") # WHEN a rule is added from string in official-rule format rules.add(yaml.safe_load(textwrap.dedent(official_rule))) rules_dict = rules.as_dict() # THEN the new rule is a combination of all self.assertEqual({}, DeepDiff(expected_rules, rules_dict)) - def test_single_rule_add_alerts_from_string(self): + def test_single_rule_add_alerts_from_dict(self): single_rule = """ alert: BarRule expr: up < 1 @@ -140,7 +140,7 @@ def test_single_rule_add_alerts_from_string(self): } # GIVEN an alert rule rules = AlertRules(query_type="promql") - rules.groups = rules._from_str(self.rule, group_name="initial") + rules.groups = rules._from_dict(self.rule, group_name="initial") # WHEN a rule is added from string in single-rule format with a custom group name and prefix rules.add( yaml.safe_load(textwrap.dedent(single_rule)), @@ -153,7 +153,7 @@ def test_single_rule_add_alerts_from_string(self): self.assertEqual({}, DeepDiff(expected_rules, rules_dict)) -class TestFromStrGroupName(unittest.TestCase): +class TestFromDictGroupName(unittest.TestCase): def setUp(self): official_rule = """ groups: @@ -175,11 +175,11 @@ def setUp(self): self.official_rule = yaml.safe_load(textwrap.dedent(official_rule)) self.single_rule = yaml.safe_load(textwrap.dedent(single_rule)) - def test_single_rule_from_string_group_sanitized(self): + def test_single_rule_from_dict_group_sanitized(self): # GIVEN an alert rule in single-rule format rules = AlertRules(query_type="promql") # WHEN processed as string and provided an illegal custom group name - groups = rules._from_str( + groups = rules._from_dict( self.single_rule, group_name="Foo$123/Hello:World(go_od)bye!@#^&*()[]{}|;:,.<>?`~_" ) for group in groups: @@ -189,61 +189,61 @@ def test_single_rule_from_string_group_sanitized(self): group["name"], "Foo_123_Hello:World_go_od_bye______________:_________alerts" ) - def test_single_rule_from_string(self): + def test_single_rule_from_dict(self): # GIVEN an alert rule in single-rule format rules = AlertRules(query_type="promql") # WHEN processed as string - groups = rules._from_str(self.single_rule) + groups = rules._from_dict(self.single_rule) for group in groups: # THEN group name contains hash self.assertNotEqual(get_hash(group["name"]), "") - def test_single_rule_from_string_custom_group(self): + def test_single_rule_from_dict_custom_group(self): # GIVEN an alert rule in single-rule format rules = AlertRules(query_type="promql") # WHEN processed as string and provided custom group name - groups = rules._from_str(self.single_rule, group_name="foo") + groups = rules._from_dict(self.single_rule, group_name="foo") for group in groups: # THEN group name does not contain hash self.assertEqual(get_hash(group["name"]), "") # AND group name contains the custom group name self.assertIn("foo", group["name"]) - def test_single_rule_from_string_custom_group_prefix(self): + def test_single_rule_from_dict_custom_group_prefix(self): # GIVEN an alert rule in single-rule format rules = AlertRules(query_type="promql") # WHEN processed as string and provided custom group name prefix - groups = rules._from_str(self.single_rule, group_name_prefix="foo") + groups = rules._from_dict(self.single_rule, group_name_prefix="foo") for group in groups: # THEN group name does not include the original group name self.assertNotIn("SomeGroupName", group["name"]) # AND group name is prefixed with custom value self.assertTrue(group["name"].startswith("foo")) - def test_official_rule_from_string(self): + def test_official_rule_from_dict(self): # GIVEN an alert rule in official-rule format rules = AlertRules(query_type="promql") - # WHEN processed as string - groups = rules._from_str(self.official_rule) + # WHEN processed as dict + groups = rules._from_dict(self.official_rule) for group in groups: # THEN group name matches the group name in the alert self.assertIn("SomeGroupName", group["name"]) - def test_official_rule_from_string_custom_group_prefix(self): + def test_official_rule_from_dict_custom_group_prefix(self): # GIVEN an alert rule in official-rule format rules = AlertRules(query_type="promql") # WHEN processed as string and provided custom group name prefix - groups = rules._from_str(self.official_rule, group_name_prefix="foo") + groups = rules._from_dict(self.official_rule, group_name_prefix="foo") for group in groups: # THEN group name includes the original group name self.assertIn("SomeGroupName", group["name"]) # AND group name is prefixed with custom value self.assertTrue(group["name"].startswith("foo")) - def test_raises_value_error_empty_str(self): + def test_raises_value_error_empty_dict(self): rules = AlertRules(query_type="promql") with self.assertRaises(ValueError) as ctx: - rules._from_str("") + rules._from_dict("") self.assertEqual(str(ctx.exception), "Empty") def test_raises_value_error_invalid_rule_format(self): @@ -255,7 +255,7 @@ def test_raises_value_error_invalid_rule_format(self): """ rules = AlertRules(query_type="promql") with self.assertRaises(ValueError) as ctx: - rules._from_str(yaml.safe_load(textwrap.dedent(invalid_rule))) + rules._from_dict(yaml.safe_load(textwrap.dedent(invalid_rule))) self.assertEqual(str(ctx.exception), "Invalid rule format") @@ -283,7 +283,7 @@ def expression_labels(expr): def get_hash(group_name: str) -> str: - """Extract the hash of the group name when a group name was not provided to _from_str method in the Rules class. + """Extract the hash of the group name when a group name was not provided to _from_dict method in the Rules class. This occurs when the single-rule format is provided rather than official-rule format.