diff --git a/.gitignore b/.gitignore index 360e31ef..94b79289 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,5 @@ python/protos/**/* python/build python/docs/sift_py *.egg-info/ + +.vscode/* diff --git a/python/examples/ingestion_with_python_config/rule_modules/velocity.yml b/python/examples/ingestion_with_python_config/rule_modules/velocity.yml new file mode 100644 index 00000000..5505580a --- /dev/null +++ b/python/examples/ingestion_with_python_config/rule_modules/velocity.yml @@ -0,0 +1,12 @@ +namespace: velocity + +rules: + - name: vehicle_stuck + description: Triggers if the vehicle velocity is not 0 for 5s after entering accelerating state + expression: $1 == "Accelerating" && persistence($2 == 0, 5) + type: review + + - name: vehicle_not_stopped + description: Triggers if the vehicle velocity does not remain 0 while stopped + expression: $1 == "Stopped" && $2 > 0 + type: review diff --git a/python/examples/ingestion_with_python_config/rule_modules/voltage.yml b/python/examples/ingestion_with_python_config/rule_modules/voltage.yml new file mode 100644 index 00000000..b47e7216 --- /dev/null +++ b/python/examples/ingestion_with_python_config/rule_modules/voltage.yml @@ -0,0 +1,12 @@ +namespace: voltage + +rules: + - name: overvoltage + description: Checks for overvoltage while accelerating + expression: $1 == "Accelerating" && $2 > 80 + type: review + + - name: undervoltage + description: Checks for undervoltage while accelerating + expression: $1 == "Accelerating" && $2 < 40 + type: review diff --git a/python/examples/ingestion_with_python_config/telemetry_config.py b/python/examples/ingestion_with_python_config/telemetry_config.py index 3003a1a7..9f8fa059 100644 --- a/python/examples/ingestion_with_python_config/telemetry_config.py +++ b/python/examples/ingestion_with_python_config/telemetry_config.py @@ -7,13 +7,14 @@ ChannelEnumType, ) from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig -from sift_py.ingestion.config.yaml.load import load_named_expression_modules +from sift_py.ingestion.config.yaml.load import load_named_expression_modules, load_rule_namespaces from sift_py.ingestion.rule.config import ( RuleActionCreateDataReviewAnnotation, RuleConfig, ) EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules") +RULE_NAMESPACES_DIR = Path().joinpath("rule_modules") def nostromos_lv_426() -> TelemetryConfig: @@ -24,6 +25,12 @@ def nostromos_lv_426() -> TelemetryConfig: ] ) + rule_namespaces = load_rule_namespaces( + [ + RULE_NAMESPACES_DIR, + ] + ) + log_channel = ChannelConfig( name="log", data_type=ChannelDataType.STRING, @@ -124,6 +131,70 @@ def nostromos_lv_426() -> TelemetryConfig: tags=["nostromo", "failure"], ), ), + RuleConfig( + name="overvoltage", + namespace="voltage", + namespace_rules=rule_namespaces, + channel_references=[ + # INFO: Can use either "channel_identifier" or "channel_config" + { + "channel_reference": "$1", + "channel_identifier": vehicle_state_channel.fqn(), + }, + { + "channel_reference": "$2", + "channel_config": voltage_channel, + }, + ], + ), + RuleConfig( + name="undervoltage", + namespace="voltage", + namespace_rules=rule_namespaces, + channel_references=[ + # INFO: Can use either "channel_identifier" or "channel_config" + { + "channel_reference": "$1", + "channel_identifier": vehicle_state_channel.fqn(), + }, + { + "channel_reference": "$2", + "channel_config": voltage_channel, + }, + ], + ), + RuleConfig( + name="vehicle_stuck", + namespace="velocity", + namespace_rules=rule_namespaces, + channel_references=[ + # INFO: Can use either "channel_identifier" or "channel_config" + { + "channel_reference": "$1", + "channel_identifier": vehicle_state_channel.fqn(), + }, + { + "channel_reference": "$2", + "channel_config": velocity_channel, + }, + ], + ), + RuleConfig( + name="vehicle_not_stopped", + namespace="velocity", + namespace_rules=rule_namespaces, + channel_references=[ + # INFO: Can use either "channel_identifier" or "channel_config" + { + "channel_reference": "$1", + "channel_identifier": vehicle_state_channel.fqn(), + }, + { + "channel_reference": "$2", + "channel_config": velocity_channel, + }, + ], + ), ], flows=[ FlowConfig( diff --git a/python/examples/ingestion_with_yaml_config/rule_modules/velocity.yml b/python/examples/ingestion_with_yaml_config/rule_modules/velocity.yml new file mode 100644 index 00000000..5505580a --- /dev/null +++ b/python/examples/ingestion_with_yaml_config/rule_modules/velocity.yml @@ -0,0 +1,12 @@ +namespace: velocity + +rules: + - name: vehicle_stuck + description: Triggers if the vehicle velocity is not 0 for 5s after entering accelerating state + expression: $1 == "Accelerating" && persistence($2 == 0, 5) + type: review + + - name: vehicle_not_stopped + description: Triggers if the vehicle velocity does not remain 0 while stopped + expression: $1 == "Stopped" && $2 > 0 + type: review diff --git a/python/examples/ingestion_with_yaml_config/rule_modules/voltage.yml b/python/examples/ingestion_with_yaml_config/rule_modules/voltage.yml new file mode 100644 index 00000000..b47e7216 --- /dev/null +++ b/python/examples/ingestion_with_yaml_config/rule_modules/voltage.yml @@ -0,0 +1,12 @@ +namespace: voltage + +rules: + - name: overvoltage + description: Checks for overvoltage while accelerating + expression: $1 == "Accelerating" && $2 > 80 + type: review + + - name: undervoltage + description: Checks for undervoltage while accelerating + expression: $1 == "Accelerating" && $2 < 40 + type: review diff --git a/python/examples/ingestion_with_yaml_config/telemetry_config.py b/python/examples/ingestion_with_yaml_config/telemetry_config.py index aab6a1ed..653e70db 100644 --- a/python/examples/ingestion_with_yaml_config/telemetry_config.py +++ b/python/examples/ingestion_with_yaml_config/telemetry_config.py @@ -5,6 +5,7 @@ TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs") EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules") +RULE_MODULES_DIR = Path().joinpath("rule_modules") def nostromos_lv_426() -> TelemetryConfig: @@ -15,11 +16,12 @@ def nostromos_lv_426() -> TelemetryConfig: telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath(telemetry_config_name) - # Load your telemetry config with your reusable expressions modules + # Load your telemetry config with your reusable expressions modules and rule modules return TelemetryConfig.try_from_yaml( telemetry_config_path, [ EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"), EXPRESSION_MODULES_DIR.joinpath("string.yml"), ], + [RULE_MODULES_DIR], ) diff --git a/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml b/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml index 1a938eff..f4999406 100644 --- a/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml +++ b/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml @@ -88,6 +88,30 @@ rules: - failure - nostromo + - namespace: voltage + name: overvoltage + channel_references: + - $1: *vehicle_state_channel + - $2: *voltage_channel + + - namespace: voltage + name: undervoltage + channel_references: + - $1: *vehicle_state_channel + - $2: *voltage_channel + + - namespace: velocity + name: vehicle_stuck + channel_references: + - $1: *vehicle_state_channel + - $2: *velocity_channel + + - namespace: velocity + name: vehicle_not_stopped + channel_references: + - $1: *vehicle_state_channel + - $2: *velocity_channel + flows: - name: readings channels: @@ -103,7 +127,7 @@ flows: - name: gpio_channel channels: - <<: *gpio_channel - + - name: logs channels: - <<: *log_channel diff --git a/python/lib/sift_py/ingestion/config/telemetry.py b/python/lib/sift_py/ingestion/config/telemetry.py index ac1b1846..e17d25da 100644 --- a/python/lib/sift_py/ingestion/config/telemetry.py +++ b/python/lib/sift_py/ingestion/config/telemetry.py @@ -12,7 +12,11 @@ ChannelEnumType, _channel_fqn, ) -from sift_py.ingestion.config.yaml.load import load_named_expression_modules, read_and_validate +from sift_py.ingestion.config.yaml.load import ( + load_named_expression_modules, + load_rule_namespaces, + read_and_validate, +) from sift_py.ingestion.config.yaml.spec import TelemetryConfigYamlSpec from sift_py.ingestion.flow import FlowConfig from sift_py.ingestion.rule.config import ( @@ -111,6 +115,7 @@ def try_from_yaml( cls, path: Path, named_expression_modules: Optional[List[Path]] = None, + named_rule_modules: Optional[List[Path]] = None, ) -> Self: """ Initializes a telemetry config from a YAML file found at the provided `path` as well as optional @@ -119,17 +124,21 @@ def try_from_yaml( config_as_yaml = read_and_validate(path) + named_expressions = {} + rule_namespaces = {} if named_expression_modules is not None: named_expressions = load_named_expression_modules(named_expression_modules) - return cls._from_yaml(config_as_yaml, named_expressions) - else: - return cls._from_yaml(config_as_yaml) + if named_rule_modules is not None: + rule_namespaces = load_rule_namespaces(named_rule_modules) + + return cls._from_yaml(config_as_yaml, named_expressions, rule_namespaces) @classmethod def _from_yaml( cls, config_as_yaml: TelemetryConfigYamlSpec, named_expressions: Dict[str, str] = {}, + rule_namespaces: Dict[str, List] = {}, ) -> Self: rules = [] flows = [] @@ -179,16 +188,21 @@ def _from_yaml( ) for rule in config_as_yaml.get("rules", []): - annotation_type = RuleActionAnnotationKind.from_str(rule["type"]) - - tags = rule.get("tags") - - action: RuleAction = RuleActionCreatePhaseAnnotation(tags) - if annotation_type == RuleActionAnnotationKind.REVIEW: - action = RuleActionCreateDataReviewAnnotation( - assignee=rule.get("assignee"), - tags=tags, - ) + namespace = rule.get("namespace", "") + + action: Optional[RuleAction] = None + description: str = "" + if not namespace: + annotation_type = RuleActionAnnotationKind.from_str(rule["type"]) + tags = rule.get("tags") + description = rule.get("description", "") + + action = RuleActionCreatePhaseAnnotation(tags) + if annotation_type == RuleActionAnnotationKind.REVIEW: + action = RuleActionCreateDataReviewAnnotation( + assignee=rule.get("assignee"), + tags=tags, + ) channel_references: List[ ExpressionChannelReference | ExpressionChannelReferenceChannelConfig @@ -206,15 +220,17 @@ def _from_yaml( } ) - expression = rule["expression"] + expression = rule.get("expression", "") if isinstance(expression, str): rules.append( RuleConfig( name=rule["name"], - description=rule.get("description", ""), + description=description, expression=expression, action=action, channel_references=channel_references, + namespace=namespace, + namespace_rules=rule_namespaces, ) ) else: @@ -237,11 +253,13 @@ def _from_yaml( rules.append( RuleConfig( name=rule["name"], - description=rule.get("description", ""), + description=description, expression=expr, action=action, channel_references=channel_references, sub_expressions=sub_exprs, + namespace=namespace, + namespace_rules=rule_namespaces, ) ) diff --git a/python/lib/sift_py/ingestion/config/telemetry_test.py b/python/lib/sift_py/ingestion/config/telemetry_test.py index 54e58042..bc54768a 100644 --- a/python/lib/sift_py/ingestion/config/telemetry_test.py +++ b/python/lib/sift_py/ingestion/config/telemetry_test.py @@ -39,10 +39,33 @@ def test_telemetry_config_load_from_yaml(mocker: MockFixture): "kinetic_energy_gt": "0.5 * $mass * $1 * $1 > $threshold", } + mock_load_rule_namespaces = mocker.patch( + _mock_path(sift_py.ingestion.config.yaml.load.load_rule_namespaces) + ) + mock_load_rule_namespaces.return_value = { + "velocity": [ + { + "name": "vehicle_stuck", + "description": "Checks that vehicle velocity becomes nonzero 5s after entering accelerating state", + "expression": '$1 == "Accelerating" && persistence($2 == 0, 5)', + "type": "review", + }, + { + "name": "vehicle_not_stopped", + "description": "Makes sure vehicle velocity remains 0 while stopped", + "expression": '$1 == "Stopped" && $2 > 0', + "type": "review", + }, + ] + } + dummy_yaml_path = Path() dummy_named_expr_mod_path = Path() + dummy_rule_namespace_path = [Path()] - telemetry_config = TelemetryConfig.try_from_yaml(dummy_yaml_path, [dummy_named_expr_mod_path]) + telemetry_config = TelemetryConfig.try_from_yaml( + dummy_yaml_path, [dummy_named_expr_mod_path], dummy_rule_namespace_path + ) assert telemetry_config.asset_name == "LunarVehicle426" assert telemetry_config.ingestion_client_key == "lunar_vehicle_426" @@ -105,14 +128,21 @@ def test_telemetry_config_load_from_yaml(mocker: MockFixture): assert gpio_channel.bit_field_elements[3].index == 7 assert gpio_channel.bit_field_elements[3].bit_count == 1 - assert len(telemetry_config.rules) == 4 + assert len(telemetry_config.rules) == 6 - overheating_rule, speeding_rule, failures_rule, kinetic_energy_rule = telemetry_config.rules + ( + overheating_rule, + speeding_rule, + failures_rule, + kinetic_energy_rule, + vehicle_stuck, + vehicle_not_stopped, + ) = telemetry_config.rules assert overheating_rule.name == "overheating" assert overheating_rule.description == "Checks for vehicle overheating" assert overheating_rule.expression == '$1 == "Accelerating" && $2 > 80' - assert overheating_rule.action.kind() == RuleActionKind.ANNOTATION + assert overheating_rule.action.kind() == RuleActionKind.ANNOTATION # type: ignore assert isinstance(overheating_rule.action, RuleActionCreateDataReviewAnnotation) assert speeding_rule.name == "speeding" @@ -133,6 +163,21 @@ def test_telemetry_config_load_from_yaml(mocker: MockFixture): assert overheating_rule.action.kind() == RuleActionKind.ANNOTATION assert isinstance(kinetic_energy_rule.action, RuleActionCreateDataReviewAnnotation) + assert vehicle_stuck.name == "vehicle_stuck" + assert ( + vehicle_stuck.description + == "Checks that vehicle velocity becomes nonzero 5s after entering accelerating state" + ) + assert vehicle_stuck.expression == '$1 == "Accelerating" && persistence($2 == 0, 5)' + assert vehicle_stuck.action.kind() == RuleActionKind.ANNOTATION # type: ignore + assert isinstance(vehicle_stuck.action, RuleActionCreateDataReviewAnnotation) + + assert vehicle_not_stopped.name == "vehicle_not_stopped" + assert vehicle_not_stopped.description == "Makes sure vehicle velocity remains 0 while stopped" + assert vehicle_not_stopped.expression == '$1 == "Stopped" && $2 > 0' + assert vehicle_not_stopped.action.kind() == RuleActionKind.ANNOTATION # type: ignore + assert isinstance(vehicle_not_stopped.action, RuleActionCreateDataReviewAnnotation) + def test_telemetry_config_err_if_duplicate_channels_in_flow(mocker: MockerFixture): """ @@ -361,6 +406,18 @@ def test_telemetry_config_validations_flows_with_same_name(): tags: - nostromo + - namespace: velocity + name: vehicle_stuck + channel_references: + - $1: *vehicle_state_channel + - $2: *velocity_channel + + - namespace: velocity + name: vehicle_not_stopped + channel_references: + - $1: *vehicle_state_channel + - $2: *velocity_channel + flows: - name: readings channels: diff --git a/python/lib/sift_py/ingestion/config/yaml/load.py b/python/lib/sift_py/ingestion/config/yaml/load.py index 7a3ed58a..fce7bca5 100644 --- a/python/lib/sift_py/ingestion/config/yaml/load.py +++ b/python/lib/sift_py/ingestion/config/yaml/load.py @@ -11,6 +11,7 @@ ChannelConfigYamlSpec, ChannelEnumTypeYamlSpec, FlowYamlSpec, + RuleNamespaceYamlSpec, RuleYamlSpec, TelemetryConfigYamlSpec, ) @@ -34,7 +35,7 @@ def load_named_expression_modules(paths: List[Path]) -> Dict[str, str]: """ Takes in a list of paths to YAML files which contains named expressions and processes them into a `dict`. The key is the name of the expression and the value is the expression itself. For more information on - named expression modules see `sift_py.ingestion/config/yaml/spec.py + named expression modules see `sift_py.ingestion/config/yaml/spec.py`. """ named_expressions = {} @@ -52,6 +53,42 @@ def load_named_expression_modules(paths: List[Path]) -> Dict[str, str]: return named_expressions +def load_rule_namespaces(paths: List[Path]) -> Dict[str, List]: + """ + Takes in a list of paths which may either be directories or files containing rule namespace YAML files, + and processes them into a `dict`. For more information on rule namespaces see + RuleNamespaceYamlSpec in `sift_py.ingestion/config/yaml/spec.py`. + """ + + rule_namespaces: Dict[str, List] = {} + + def update_rule_namespaces(rule_module_path: Path): + rule_module = _read_rule_namespace_yaml(rule_module_path) + + for key in rule_module.keys(): + if key in rule_namespaces: + raise YamlConfigError( + f"Encountered rules with identical names being loaded, '{key}'." + ) + + rule_namespaces.update(rule_module) + + def handle_dir(path: Path): + for file_in_dir in path.iterdir(): + if file_in_dir.is_dir(): + handle_dir(file_in_dir) + elif file_in_dir.is_file(): + update_rule_namespaces(file_in_dir) + + for path in paths: + if path.is_dir(): + handle_dir(path) + elif path.is_file(): + update_rule_namespaces(path) + + return rule_namespaces + + def _read_named_expression_module_yaml(path: Path) -> Dict[str, str]: with open(path, "r") as f: named_expressions = cast(Dict[Any, Any], yaml.safe_load(f.read())) @@ -69,6 +106,36 @@ def _read_named_expression_module_yaml(path: Path) -> Dict[str, str]: return cast(Dict[str, str], named_expressions) +def _read_rule_namespace_yaml(path: Path) -> Dict[str, List]: + with open(path, "r") as f: + namespace_rules = cast(Dict[Any, Any], yaml.safe_load(f.read())) + namespace = namespace_rules.get("namespace") + + if not isinstance(namespace, str): + raise YamlConfigError( + f"Expected '{namespace} to be a string in rule namespace yaml: '{path}'" + f"{_type_fqn(RuleNamespaceYamlSpec)}" + ) + + rules = namespace_rules.get("rules") + if not isinstance(namespace, str): + raise YamlConfigError( + f"Expected '{rules}' to be a list in rule namespace yaml: '{path}'" + f"{_type_fqn(RuleNamespaceYamlSpec)}" + ) + + for rule in cast(List[Any], rules): + nested_namespace = rule.get("namespace") + if nested_namespace: + raise YamlConfigError( + "Rules referencing other namespaces cannot be nested. " + f"Found nested namespace '{nested_namespace}' in '{path}'. " + ) + _validate_rule(rule) + + return {namespace: cast(List[Any], rules)} + + def _validate_yaml(raw_config: Dict[Any, Any]) -> TelemetryConfigYamlSpec: asset_name = raw_config.get("asset_name") @@ -105,8 +172,8 @@ def _validate_yaml(raw_config: Dict[Any, Any]) -> TelemetryConfigYamlSpec: if rules is not None: if not isinstance(rules, list): raise YamlConfigError._invalid_property( - channels, - "channels", + rules, + "rules", f"List[{_type_fqn(RuleYamlSpec)}]", None, ) @@ -275,18 +342,53 @@ def _validate_bit_field_element(val: Any): def _validate_rule(val: Any): rule = cast(Dict[Any, Any], val) + namespace = rule.get("namespace") + if namespace is not None and not isinstance(namespace, str): + raise YamlConfigError._invalid_property( + namespace, + "- namespace", + "str", + ["rules"], + ) + name = rule.get("name") if not isinstance(name, str): raise YamlConfigError._invalid_property(name, "- name", "str", ["rules"]) + channel_references = rule.get("channel_references") + + if namespace or (channel_references is not None): + if not isinstance(channel_references, list): + raise YamlConfigError._invalid_property( + channel_references, + "- channel_references", + f"List[Dict[str, {_type_fqn(ChannelConfigYamlSpec)}]]", + ["rules"], + ) + + for channel_reference in cast(List[Any], channel_references): + _validate_channel_reference(channel_reference) + description = rule.get("description") + expression = rule.get("expression") + rule_type = rule.get("type") + assignee = rule.get("assignee") + tags = rule.get("tags") + sub_expressions = rule.get("sub_expressions") + + if namespace: + if any([description, expression, rule_type, assignee, tags, sub_expressions]): + raise YamlConfigError( + f"Rule '{name}' is a namespace and should not have any other properties set. " + "Properties 'description', 'expression', 'type', 'assignee', 'tags', and 'sub_expressions' " + "may be defined in the referenced namespace." + ) + return if description is not None and not isinstance(description, str): raise YamlConfigError._invalid_property(description, "- description", "str", ["rules"]) - expression = rule.get("expression") - if isinstance(expression, dict): expression_name = cast(Dict[Any, Any], expression).get("name") @@ -306,7 +408,6 @@ def _validate_rule(val: Any): ["rules"], ) - rule_type = rule.get("type") valid_rule_types = [kind.value for kind in RuleActionAnnotationKind] if rule_type not in valid_rule_types: @@ -317,8 +418,6 @@ def _validate_rule(val: Any): ["rules"], ) - assignee = rule.get("assignee") - if assignee is not None and not isinstance(assignee, str): raise YamlConfigError._invalid_property( assignee, @@ -327,8 +426,6 @@ def _validate_rule(val: Any): ["rules"], ) - tags = rule.get("tags") - if tags is not None and not isinstance(tags, list): raise YamlConfigError._invalid_property( tags, @@ -337,22 +434,6 @@ def _validate_rule(val: Any): ["rules"], ) - channel_references = rule.get("channel_references") - - if channel_references is not None: - if not isinstance(channel_references, list): - raise YamlConfigError._invalid_property( - channel_references, - "- channel_references", - f"List[Dict[str, {_type_fqn(ChannelConfigYamlSpec)}]]", - ["rules"], - ) - - for channel_reference in cast(List[Any], channel_references): - _validate_channel_reference(channel_reference) - - sub_expressions = rule.get("sub_expressions") - if sub_expressions is not None: if not isinstance(channel_references, list): raise YamlConfigError._invalid_property( diff --git a/python/lib/sift_py/ingestion/config/yaml/spec.py b/python/lib/sift_py/ingestion/config/yaml/spec.py index 4a024c91..33791873 100644 --- a/python/lib/sift_py/ingestion/config/yaml/spec.py +++ b/python/lib/sift_py/ingestion/config/yaml/spec.py @@ -90,11 +90,24 @@ class FlowYamlSpec(TypedDict): channels: List[ChannelConfigYamlSpec] +class RuleNamespaceYamlSpec(TypedDict): + """ + The formal definition of what a rule namespace looks like in YAML. + + `namespace`: Name of the namespace. + `rules`: A list of rules that belong to the namespace. + """ + + namespace: str + rules: List[RuleYamlSpec] + + class RuleYamlSpec(TypedDict): """ The formal definition of what a single rule looks like in YAML. `name`: Name of the rule. + `namespace`: Optional namespace of the rule. Only used if referencing a rule defined in a namespace. `description`: Description of rule. `expression`: Either an expression-string or a `sift_py.ingestion.config.yaml.spec.NamedExpressionYamlSpec` referencing a named expression. @@ -104,6 +117,26 @@ class RuleYamlSpec(TypedDict): `channel_references`: A list of channel references that maps to an actual channel. More below. `sub_expressions`: A list of sub-expressions which is a mapping of place-holders to sub-expressions. Only used if using named expressions. + Namespaces: + Rule may be defined in a separate YAML within a namespace. The reference to the namespace rule would look like the following: + ```yaml + rules: + - namespace: voltage + name: overvoltage + channel_references: + - $1: *vehicle_state_channel + - $2: *voltage_channel + ``` + With the corresponding rule being defined in a separate YAML file like the following: + ```yaml + namespace: voltage + rules: + - name: overvoltage + description: Checks for overvoltage while accelerating + expression: $1 == "Accelerating" && $2 > 80 + type: review + ``` + Channel references: A channel reference is a string containing a numerical value prefixed with "$". Examples include "$1", "$2", "$11", and so on. The channel reference is mapped to an actual channel config. In YAML it would look something like this: @@ -146,6 +179,7 @@ class RuleYamlSpec(TypedDict): """ name: str + namespace: NotRequired[str] description: NotRequired[str] expression: Union[str, NamedExpressionYamlSpec] type: Union[Literal["phase"], Literal["review"]] diff --git a/python/lib/sift_py/ingestion/config/yaml/test_load.py b/python/lib/sift_py/ingestion/config/yaml/test_load.py index ea80403b..4ffc75fe 100644 --- a/python/lib/sift_py/ingestion/config/yaml/test_load.py +++ b/python/lib/sift_py/ingestion/config/yaml/test_load.py @@ -199,6 +199,18 @@ def test__validate_rule(): } ) + # Rule in referenced namespace + load._validate_rule( + { + "namespace": "voltage", + "name": "overvoltage_rule", + "channel_references": [ + {"$1": {"name": "voltage", "data_type": "double"}}, + {"$2": {"name": "vehicle_state", "data_type": "double"}}, + ], + } + ) + with pytest.raises(YamlConfigError, match="Expected 'name' to be but it is "): load._validate_rule( { @@ -315,6 +327,23 @@ def test__validate_rule(): } ) + with pytest.raises(YamlConfigError, match="should not have any other properties set"): + load._validate_rule( + { + "name": "overheat_rule", + "namespace": "my_namespace", + "description": "some_description", + "expression": "$1 > 10 && $2 > 10", + "type": "review", + "assignee": "homer@example.com", + "tags": ["foo", "bar"], + "channel_references": [ + {"$1": {"name": "voltage", "data_type": "double"}}, + {"$2": {"name": "vehicle_state", "data_type": "double"}}, + ], + } + ) + def test__validate_flow(): load._validate_flow( diff --git a/python/lib/sift_py/ingestion/rule/config.py b/python/lib/sift_py/ingestion/rule/config.py index 984d0f67..4d948ccf 100644 --- a/python/lib/sift_py/ingestion/rule/config.py +++ b/python/lib/sift_py/ingestion/rule/config.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Dict, List, Optional, TypedDict, Union, cast +from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union, cast from sift.annotations.v1.annotations_pb2 import AnnotationType from sift.rules.v1.rules_pb2 import ActionKind @@ -26,19 +26,21 @@ class RuleConfig(AsJson): name: str description: str expression: str - action: RuleAction + action: Optional[RuleAction] channel_references: List[ExpressionChannelReference] def __init__( self, name: str, - description: str, - expression: str, - action: RuleAction, channel_references: List[ Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] ], + description: str = "", + expression: str = "", + action: Optional[RuleAction] = None, sub_expressions: Dict[str, Any] = {}, + namespace: str = "", + namespace_rules: Dict[str, List[Dict]] = {}, ): self.channel_references = [] @@ -65,8 +67,14 @@ def __init__( ) self.name = name - self.description = description + + if namespace: + description, expression, action = self.__class__.interpolate_namespace_rule( + name, namespace, namespace_rules + ) + self.action = action + self.description = description self.expression = self.__class__.interpolate_sub_expressions(expression, sub_expressions) def as_json(self) -> Any: @@ -98,22 +106,59 @@ def as_json(self) -> Any: if self.action.tags is not None and len(self.action.tags) > 0: hash_map["tags"] = self.action.tags else: - raise TypeError(f"Unsupported rule action '{self.action.kind()}'.") + kind = self.action.kind() if self.action else self.action + raise TypeError(f"Unsupported rule action '{kind}'.") return hash_map @staticmethod - def interpolate_sub_expressions(expression: str, sub_expressions: Dict[str, str]) -> str: - for ref, expr in sub_expressions.items(): - if ref not in expression: - raise ValueError(f"Couldn't find '{ref}' in expression '{expression}'.") - if isinstance(expr, str): - expression = expression.replace(ref, f'"{expr}"') - else: - expression = expression.replace(ref, str(expr)) + def interpolate_sub_expressions( + expression: str, sub_expressions: Optional[Dict[str, str]] + ) -> str: + if sub_expressions: + for ref, expr in sub_expressions.items(): + if ref not in expression: + raise ValueError(f"Couldn't find '{ref}' in expression '{expression}'.") + if isinstance(expr, str): + expression = expression.replace(ref, f'"{expr}"') + else: + expression = expression.replace(ref, str(expr)) return expression + @staticmethod + def interpolate_namespace_rule( + name: str, namespace: str, namespace_rules: Optional[Dict[str, List[Dict]]] + ) -> Tuple[str, str, RuleAction]: + if not namespace_rules: + raise ValueError( + f"Namespace rules must be provided with namespace key. Got: {namespace_rules}" + ) + + rule_list = namespace_rules.get(namespace) + if not rule_list: + raise ValueError( + f"Couldn't find namespace '{namespace}' in namespace_rules: {namespace_rules}" + ) + + for rule in rule_list: + candidate_name = rule.get("name") + if candidate_name == name: + description = rule.get("description", "") + expression = rule.get("expression", "") + type = rule.get("type", "") + tags = rule.get("tags") + action: RuleAction = RuleActionCreatePhaseAnnotation(tags) + if RuleActionAnnotationKind.from_str(type) == RuleActionAnnotationKind.REVIEW: + action = RuleActionCreateDataReviewAnnotation( + assignee=rule.get("assignee"), tags=tags + ) + return description, expression, action + + raise ValueError( + f"Could not find rule '{rule}'. Does this rule exist in the namespace? {rule_list}" + ) + class RuleAction(ABC): @abstractmethod diff --git a/python/lib/sift_py/ingestion/rule/config_test.py b/python/lib/sift_py/ingestion/rule/config_test.py index 8ebc6cca..1ffe97bf 100644 --- a/python/lib/sift_py/ingestion/rule/config_test.py +++ b/python/lib/sift_py/ingestion/rule/config_test.py @@ -1,8 +1,11 @@ +import pytest + from sift_py.ingestion.channel import ChannelConfig, ChannelDataType from .config import ( RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation, + RuleActionKind, RuleConfig, ) @@ -107,3 +110,147 @@ def test_rule_named_expressions(): }, ) assert rule_on_kinetic_energy.expression == "0.5 * 10 * $1 * $1 > 35" + + +def test_rule_namespace(): + namespace_rules = { + "valid_namespace": [ + { + "name": "valid_rule", + "description": "A rule in a namespace", + "expression": "$1 > 10", + "type": "review", + "assignee": "bob@example.com", + "tags": ["foo", "bar"], + }, + { + "name": "another_valid_rule", + "description": "Another rule in a namespace", + "expression": "$1 < 10", + "type": "review", + "assignee": "mary@example.com", + "tags": ["baz", "qux"], + }, + ] + } + + valid_namespace_rule = RuleConfig( + name="valid_rule", + namespace="valid_namespace", + namespace_rules=namespace_rules, + channel_references=[ + { + "channel_reference": "$1", + "channel_config": ChannelConfig( + name="a_channel", + data_type=ChannelDataType.DOUBLE, + ), + } + ], + ) + assert valid_namespace_rule.name == "valid_rule" + assert valid_namespace_rule.description == "A rule in a namespace" + assert valid_namespace_rule.expression == "$1 > 10" + assert valid_namespace_rule.action.assignee == "bob@example.com" + assert valid_namespace_rule.action.tags == ["foo", "bar"] + assert valid_namespace_rule.action.kind() == RuleActionKind.ANNOTATION + assert isinstance(valid_namespace_rule.action, RuleActionCreateDataReviewAnnotation) + + +def test_rule_namespace_missing_namespace_rules(): + with pytest.raises(ValueError, match="Namespace rules must be provided with namespace key."): + RuleConfig( + name="a_rule", + namespace="a_namespace", + channel_references=[ + { + "channel_reference": "$1", + "channel_config": ChannelConfig( + name="a_channel", + data_type=ChannelDataType.DOUBLE, + ), + } + ], + ) + + +def test_rule_namespace_missing_namespace(): + with pytest.raises(ValueError, match="Couldn't find namespace"): + namespace_rules = { + "a_namespace": [ + { + "name": "valid_rule", + "description": "A rule in a namespace", + "expression": "$1 > 10", + "type": "review", + "assignee": "bob@example.com", + "tags": ["foo", "bar"], + }, + ], + "another_namespace": [ + { + "name": "valid_rule", + "description": "A rule in a namespace", + "expression": "$1 > 10", + "type": "review", + "assignee": "bob@example.com", + "tags": ["foo", "bar"], + }, + ], + } + + RuleConfig( + name="valid_rule", + namespace="a_missing_namespace", + namespace_rules=namespace_rules, + channel_references=[ + { + "channel_reference": "$1", + "channel_config": ChannelConfig( + name="a_channel", + data_type=ChannelDataType.DOUBLE, + ), + } + ], + ) + + +def test_rule_namespace_missing_rule(): + with pytest.raises(ValueError, match="Does this rule exist in the namespace?"): + namespace_rules = { + "a_namespace": [ + { + "name": "a_rule_in_namespace", + "description": "A rule in a namespace", + "expression": "$1 > 10", + "type": "review", + "assignee": "bob@example.com", + "tags": ["foo", "bar"], + }, + ], + "another_namespace": [ + { + "name": "another_rule_in_namespace", + "description": "A rule in a namespace", + "expression": "$1 > 10", + "type": "review", + "assignee": "bob@example.com", + "tags": ["foo", "bar"], + }, + ], + } + + RuleConfig( + name="a_missing_rule", + namespace="a_namespace", + namespace_rules=namespace_rules, + channel_references=[ + { + "channel_reference": "$1", + "channel_config": ChannelConfig( + name="a_channel", + data_type=ChannelDataType.DOUBLE, + ), + } + ], + )