Skip to content

Commit

Permalink
chore: PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
niliayu committed Dec 6, 2024
1 parent aa4547c commit 080e763
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 65 deletions.
36 changes: 9 additions & 27 deletions python/examples/asset_agnostic_rules/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from dotenv import load_dotenv
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.service import IngestionService, TelemetryConfig
from sift_py.rule.config import ExpressionChannelReference
from sift_py.rule.service import RuleService, SubExpression
from sift_py.rule.service import RuleChannelReference, RuleService, SubExpression
from simulator import Simulator

TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
Expand Down Expand Up @@ -59,31 +58,14 @@
SubExpression("voltage.undervoltage", {"$1": 30}),
SubExpression("velocity.vehicle_not_stopped", {"$2": 10}),
],
channel_references_map={
"overvoltage": [
ExpressionChannelReference(
channel_reference="$2", channel_identifier="vehicle_state"
),
],
"undervoltage": [
ExpressionChannelReference(
channel_reference="$2", channel_identifier="vehicle_state"
),
],
"vehicle_stuck": [
ExpressionChannelReference(
channel_reference="$1", channel_identifier="vehicle_state"
),
ExpressionChannelReference(
channel_reference="$2", channel_identifier="mainmotor.velocity"
),
],
"vehicle_not_stopped": [
ExpressionChannelReference(
channel_reference="$1", channel_identifier="vehicle_state"
),
],
},
channel_references=[
RuleChannelReference("overvoltage", {"$2": "vehicle_state"}),
RuleChannelReference("undervoltage", {"$2": "vehicle_state"}),
RuleChannelReference(
"vehicle_stuck", {"$1": "vehicle_state", "$2": "mainmotor.velocity"}
),
RuleChannelReference("vehicle_not_stopped", {"$1": "vehicle_state"}),
],
)

# Create an optional run as part of this ingestion
Expand Down
101 changes: 63 additions & 38 deletions python/lib/sift_py/rule/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ def load_rules_from_yaml(
self,
paths: List[Path],
sub_expressions: Optional[List[SubExpression]] = None,
channel_references_map: Optional[
Dict[
str,
List[Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig]],
]
] = None,
channel_references: Optional[List[RuleChannelReference]] = None,
) -> List[RuleConfig]:
"""
Loads rules from a YAML spec, and creates or updates the rules in the Sift API.
Expand All @@ -86,40 +81,57 @@ def load_rules_from_yaml(
for namespace, rule_yamls in namespaced_rules.items():
for rule_yaml in rule_yamls:
yaml_channel_references = rule_yaml.get("channel_references", [])
arg_channel_references = (
channel_references_map.get(rule_yaml["name"])
if channel_references_map
else None
)
channel_references: List[

if channel_references:
for channel_map in channel_references:
if channel_map.fully_qualified_rule_name == rule_yaml["name"]:
arg_channel_references = channel_map.channel_references
else:
arg_channel_references = None

if yaml_channel_references and arg_channel_references:
raise ValueError(
f"Rule of name '{rule_yaml['name']}' cannot have both YAML and channel_references argument provided. "
"Please provide only one or the other."
)

rule_channel_references: List[
Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig]
] = []

def parse_channel_refs(channel_ref: Dict[str, Any]):
for ref, channel_config in channel_ref.items():
if isinstance(channel_config, dict):
name = channel_config.get("name", "")
component = channel_config.get("component", "")
elif isinstance(channel_config, str):
channel_reference = channel_reference_from_fqn(channel_config)
name = channel_reference.name
component = channel_reference.component
else:
raise ValueError(
f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec"
)

rule_channel_references.append(
{
"channel_reference": ref,
"channel_identifier": channel_fqn(
{
"channel_name": name,
"component": component,
}
),
}
)

if yaml_channel_references:
for channel_ref in yaml_channel_references:
for ref, config in channel_ref.items():
channel_references.append(
{
"channel_reference": ref,
"channel_identifier": channel_fqn(
{
"channel_name": config.get("name", ""),
"component": config.get("component", ""),
}
),
}
)
parse_channel_refs(channel_ref)
elif arg_channel_references:
channel_references = cast(
List[
Union[
ExpressionChannelReference, ExpressionChannelReferenceChannelConfig
]
],
arg_channel_references,
)
parse_channel_refs(arg_channel_references)

if not channel_references:
if not rule_channel_references:
raise ValueError(
f"Rule of name '{rule_yaml['name']}' requires channel_references"
)
Expand All @@ -136,7 +148,7 @@ def load_rules_from_yaml(
rule_client_key=rule_yaml.get("rule_client_key"),
description=rule_yaml.get("description", ""),
expression=cast(str, rule_yaml["expression"]),
channel_references=channel_references,
channel_references=rule_channel_references,
asset_names=rule_yaml.get("asset_names", []),
sub_expressions=rule_subexpr,
)
Expand Down Expand Up @@ -245,15 +257,14 @@ def search_channels(filter="", page_size=1_000, page_token="") -> Tuple[List[Cha
found_channels = []
filter = f"asset_id == '{asset.asset_id}' && {name_in} && {component_in}"
channels, next_page_token = search_channels( # Initialize next_page_token
filter,
"",
filter=filter,
)
found_channels.extend([channel.name for channel in channels])

while len(next_page_token) > 0:
channels, next_page_token = search_channels(
filter,
next_page_token,
filter=filter,
page_token=next_page_token,
)
found_channels.extend([channel.name for channel in channels])

Expand Down Expand Up @@ -327,3 +338,17 @@ class SubExpression:
def __init__(self, fully_qualified_rule_name: str, expressions: Dict[str, Any]):
self.fully_qualified_rule_name = fully_qualified_rule_name
self.expressions = expressions


class RuleChannelReference:
"""
Convenient wrapper to map fully qualified rule names to relevant channel references
when creating rules from yaml.
"""

fully_qualified_rule_name: str
channel_references: Dict[str, Any]

def __init__(self, fully_qualified_rule_name: str, channel_references: Dict[str, Any]):
self.fully_qualified_rule_name = fully_qualified_rule_name
self.channel_references = channel_references

0 comments on commit 080e763

Please sign in to comment.