From 4d3eb9622466590d571b65ed0a107a7567bc946f Mon Sep 17 00:00:00 2001 From: Benji Nguyen <45523555+solidiquis@users.noreply.github.com> Date: Thu, 20 Jun 2024 10:24:51 -0700 Subject: [PATCH] python(feature): rules (#45) --- .github/workflows/python_ci.yaml | 2 +- examples/python/annotations/main.py | 6 +- examples/python/ingest_with_config/main.py | 24 +- .../sift_ingestion_utils.py | 27 +- python/examples/rule_expressions.example.yaml | 4 + python/examples/telemetry_config.example.yml | 42 +- python/lib/sift_internal/__init__.py | 2 +- python/lib/sift_internal/convert/json.py | 24 + python/lib/sift_internal/convert/protobuf.py | 37 +- .../test_util/__init__.py} | 0 python/lib/sift_internal/test_util/channel.py | 66 +++ python/lib/sift_internal/types.py | 14 - python/lib/sift_py/grpc/interceptors.py | 5 +- python/lib/sift_py/grpc/transport.py | 15 +- python/lib/sift_py/ingestion/channel.py | 161 ++++-- .../lib/sift_py/ingestion/config/__init__.py | 40 -- .../lib/sift_py/ingestion/config/telemetry.py | 105 ++++ .../ingestion/config/telemetry_test.py | 104 ++++ python/lib/sift_py/ingestion/config/yaml.py | 468 ++++++++++++++--- .../lib/sift_py/ingestion/config/yaml_test.py | 145 +++++- python/lib/sift_py/ingestion/error.py | 14 - python/lib/sift_py/ingestion/flow.py | 51 +- python/lib/sift_py/ingestion/impl/__init__.py | 2 +- python/lib/sift_py/ingestion/impl/channel.py | 38 ++ python/lib/sift_py/ingestion/impl/error.py | 10 + python/lib/sift_py/ingestion/impl/ingest.py | 221 ++++++-- .../lib/sift_py/ingestion/impl/ingest_test.py | 474 ++++++++++++++++++ .../ingestion/impl/ingestion_config.py | 80 ++- python/lib/sift_py/ingestion/impl/rule.py | 49 ++ python/lib/sift_py/ingestion/impl/run.py | 45 ++ python/lib/sift_py/ingestion/rule/__init__.py | 3 + python/lib/sift_py/ingestion/rule/config.py | 199 ++++++++ .../lib/sift_py/ingestion/rule/config_test.py | 72 +++ python/lib/sift_py/ingestion/service.py | 57 ++- python/lib/sift_py/rule/__init__.py | 3 - python/pyproject.toml | 14 +- python/scripts/dev | 20 +- scripts/gen.sh | 10 +- scripts/setup.py | 14 +- 39 files changed, 2304 insertions(+), 363 deletions(-) create mode 100644 python/examples/rule_expressions.example.yaml create mode 100644 python/lib/sift_internal/convert/json.py rename python/lib/{sift_py/rule/config.py => sift_internal/test_util/__init__.py} (100%) create mode 100644 python/lib/sift_internal/test_util/channel.py delete mode 100644 python/lib/sift_internal/types.py create mode 100644 python/lib/sift_py/ingestion/config/telemetry.py create mode 100644 python/lib/sift_py/ingestion/config/telemetry_test.py delete mode 100644 python/lib/sift_py/ingestion/error.py create mode 100644 python/lib/sift_py/ingestion/impl/channel.py create mode 100644 python/lib/sift_py/ingestion/impl/error.py create mode 100644 python/lib/sift_py/ingestion/impl/ingest_test.py create mode 100644 python/lib/sift_py/ingestion/impl/rule.py create mode 100644 python/lib/sift_py/ingestion/impl/run.py create mode 100644 python/lib/sift_py/ingestion/rule/__init__.py create mode 100644 python/lib/sift_py/ingestion/rule/config.py create mode 100644 python/lib/sift_py/ingestion/rule/config_test.py delete mode 100644 python/lib/sift_py/rule/__init__.py diff --git a/.github/workflows/python_ci.yaml b/.github/workflows/python_ci.yaml index e2fedaf5..e599a196 100644 --- a/.github/workflows/python_ci.yaml +++ b/.github/workflows/python_ci.yaml @@ -18,7 +18,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v2 with: - python-version: "3.8" + python-version: "3.9" - name: Pip install working-directory: python diff --git a/examples/python/annotations/main.py b/examples/python/annotations/main.py index 9284c603..a200d139 100644 --- a/examples/python/annotations/main.py +++ b/examples/python/annotations/main.py @@ -18,11 +18,13 @@ credentials = grpc.ssl_channel_credentials() call_credentials = grpc.access_token_call_credentials(API_KEY) - composite_credentials = grpc.composite_channel_credentials(credentials, call_credentials) + composite_credentials = grpc.composite_channel_credentials( + credentials, call_credentials + ) with grpc.secure_channel(BASE_URI, composite_credentials) as channel: annotation_service = AnnotationServiceStub(channel) - request = ListAnnotationsRequest(filter=f"name.matches(\"(?i){name}\")") + request = ListAnnotationsRequest(filter=f'name.matches("(?i){name}")') response = annotation_service.ListAnnotations(request) print(response) channel.close() diff --git a/examples/python/ingest_with_config/main.py b/examples/python/ingest_with_config/main.py index 24f51c84..359e8fce 100644 --- a/examples/python/ingest_with_config/main.py +++ b/examples/python/ingest_with_config/main.py @@ -8,6 +8,7 @@ import sift.ingestion_configs.v1.ingestion_configs_pb2 as ingestconf import sift.runs.v2.runs_pb2 as runpb + def main(): """ This is an example script demonstrating how to use Sift's IngestService_IngestWithConfigDataStream @@ -39,12 +40,14 @@ def main(): with ingest.use_secure_channel(api_key, base_uri) as channel: channel_configs = [] - for channel_config in ExampleTestRunConfig.CHANNEL_CONFIG_PARAMS: + for channel_config in ExampleTestRunConfig.CHANNEL_CONFIG_PARAMS: name, component, desc, unit = channel_config conf = ingest.create_double_type_channel_config(name, component, desc, unit) channel_configs.append(conf) - flow_config = ingest.create_flow_config(ExampleTestRunConfig.FLOW_NAME, *channel_configs) + flow_config = ingest.create_flow_config( + ExampleTestRunConfig.FLOW_NAME, *channel_configs + ) print("Creating ingestion config... ", end="") ingestion_config = ingest.create_ingestion_config( @@ -67,12 +70,15 @@ def main(): print(f"ok [run_id {run.run_id}]") print("Beginning ingestion...") - ingestion_simulator = create_ingestion_simulator(run, ingestion_config, flow_config) + ingestion_simulator = create_ingestion_simulator( + run, ingestion_config, flow_config + ) ingest.ingest_with_config(channel, ingestion_simulator) print("Ingestion completed.") channel.close() + class ExampleTestRunConfig: """ This is a sample of various properties we'll use to constitute an asset, its associated channels, @@ -87,11 +93,14 @@ class ExampleTestRunConfig: FLOW_NAME = "example_flow" # (name, component, description, units) - CHANNEL_CONFIG_PARAMS: List[Tuple[str, Optional[str], Optional[str], Optional[str]]] = [ + CHANNEL_CONFIG_PARAMS: List[ + Tuple[str, Optional[str], Optional[str], Optional[str]] + ] = [ ("pressure", None, "pressure applied", "mmHg"), ("velocity", "mainmotor", None, "m/s"), ] + def create_ingestion_simulator( run: runpb.Run, ingestion_config: ingestconf.IngestionConfig, @@ -103,7 +112,7 @@ def create_ingestion_simulator( some amount of data points (100 by default) 5ms apart that will get ingested into the Sift API. """ - current_timestamp = run.start_time + current_timestamp = run.start_time total_messages_sent = 0 for i in range(num_data_points): @@ -127,9 +136,12 @@ def create_ingestion_simulator( request.channel_values.append(velocity) total_messages_sent += 1 - print(f"Sending message [time={timestamp.ToJsonString()} run={run.run_id} total_messages_sent={total_messages_sent}]") + print( + f"Sending message [time={timestamp.ToJsonString()} run={run.run_id} total_messages_sent={total_messages_sent}]" + ) yield request + if __name__ == "__main__": main() diff --git a/examples/python/ingest_with_config/sift_ingestion_utils.py b/examples/python/ingest_with_config/sift_ingestion_utils.py index 9f1c3333..29bc2c3c 100644 --- a/examples/python/ingest_with_config/sift_ingestion_utils.py +++ b/examples/python/ingest_with_config/sift_ingestion_utils.py @@ -1,7 +1,7 @@ from google.protobuf import timestamp_pb2 from typing import Any, cast, Generator, Optional import grpc -from sift.common.type.v1.channel_data_type_pb2 import ChannelDataType +from sift.common.type.v1.channel_data_type_pb2 import ChannelDataType import sift.ingest.v1.ingest_pb2 as ingest import sift.ingest.v1.ingest_pb2_grpc as ingest_grpc import sift.ingestion_configs.v1.ingestion_configs_pb2 as ingestconf @@ -9,6 +9,7 @@ import sift.runs.v2.runs_pb2 as run import sift.runs.v2.runs_pb2_grpc as run_grpc + def use_secure_channel(api_key: str, base_uri: str) -> grpc.Channel: """ Produces channel that is used to create a secure connection to a gRPC server. @@ -16,8 +17,11 @@ def use_secure_channel(api_key: str, base_uri: str) -> grpc.Channel: """ credentials = grpc.ssl_channel_credentials() call_credentials = grpc.access_token_call_credentials(api_key) - composite_credentials = grpc.composite_channel_credentials(credentials, call_credentials) - return grpc.secure_channel(base_uri,composite_credentials) + composite_credentials = grpc.composite_channel_credentials( + credentials, call_credentials + ) + return grpc.secure_channel(base_uri, composite_credentials) + def create_double_type_channel_config( name: str, @@ -42,7 +46,10 @@ def create_double_type_channel_config( return config -def create_flow_config(flow_name: str, *channel_configs: ingestconf.ChannelConfig) -> ingestconf.FlowConfig: + +def create_flow_config( + flow_name: str, *channel_configs: ingestconf.ChannelConfig +) -> ingestconf.FlowConfig: """ Creates a flow config that describes a group of channels that will telemeter data. """ @@ -53,6 +60,7 @@ def create_flow_config(flow_name: str, *channel_configs: ingestconf.ChannelConfi return config + def create_ingestion_config( channel: grpc.Channel, asset_name: str, @@ -67,10 +75,13 @@ def create_ingestion_config( for flow_config in flow_configs: request.flows.append(flow_config) - response = ingestconf_grpc.IngestionConfigServiceStub(channel).CreateIngestionConfig(request) + response = ingestconf_grpc.IngestionConfigServiceStub( + channel + ).CreateIngestionConfig(request) return cast(ingestconf.CreateIngestionConfigResponse, response).ingestion_config + def create_run( channel: grpc.Channel, name: str, @@ -102,6 +113,7 @@ def create_run( return cast(run.CreateRunResponse, response).run + def ingest_with_config( channel: grpc.Channel, ingestion_iter: Generator[ingest.IngestWithConfigDataStreamRequest, Any, None], @@ -111,10 +123,13 @@ def ingest_with_config( Data should be available to view shortly after this function concludes """ try: - ingest_grpc.IngestServiceStub(channel).IngestWithConfigDataStream(ingestion_iter) + ingest_grpc.IngestServiceStub(channel).IngestWithConfigDataStream( + ingestion_iter + ) except Exception as e: print(f"Something went wrong during ingestion: {e}") + def delete_run(run_id: str, api_key: str, base_uri: str): """ Handy utility to delete your test runs diff --git a/python/examples/rule_expressions.example.yaml b/python/examples/rule_expressions.example.yaml new file mode 100644 index 00000000..03d2c1c1 --- /dev/null +++ b/python/examples/rule_expressions.example.yaml @@ -0,0 +1,4 @@ +log_substring_contains: + contains($1, $2) +is_even: + mod($1, 2) == 0 diff --git a/python/examples/telemetry_config.example.yml b/python/examples/telemetry_config.example.yml index 604a2663..48b5040f 100644 --- a/python/examples/telemetry_config.example.yml +++ b/python/examples/telemetry_config.example.yml @@ -4,25 +4,25 @@ ingestion_client_key: lunar_vehicle_426 channels: log_channel: &log_channel name: log - data_type: CHANNEL_DATA_TYPE_STRING + data_type: string description: asset logs velocity_channel: &velocity_channel name: velocity - data_type: CHANNEL_DATA_TYPE_DOUBLE + data_type: double description: speed unit: Miles Per Hour component: mainmotor voltage_channel: &voltage_channel name: voltage - data_type: CHANNEL_DATA_TYPE_INT_32 + data_type: int32 description: voltage at the source unit: Volts vehicle_state_channel: &vehicle_state_channel name: vehicle_state - data_type: CHANNEL_DATA_TYPE_ENUM + data_type: enum description: vehicle state unit: vehicle state enum_types: @@ -35,7 +35,7 @@ channels: gpio_channel: &gpio_channel name: gpio - data_type: CHANNEL_DATA_TYPE_BIT_FIELD + data_type: bit_field description: on/off values for pins on gpio bit_field_elements: - name: 12v @@ -51,6 +51,37 @@ channels: index: 7 bit_count: 1 +rules: + - name: overheating + description: Checks for vehicle overheating + expression: $1 == "Accelerating" && $2 > 80 + channel_references: + - $1: *vehicle_state_channel + - $2: *voltage_channel + type: review + + - name: speeding + description: Checks high vehicle speed + type: phase + expression: $1 > 20 + channel_references: + - $1: *velocity_channel + + - name: failures + description: Checks for failure logs + type: review + assignee: homer@example.com + expression: + name: log_substring_contains + channel_references: + - $1: *log_channel + sub_expressions: + - $2: ERROR + tags: + - foo + - bar + - baz + flows: - name: readings channels: @@ -67,3 +98,4 @@ flows: - name: logs channels: - <<: *log_channel + diff --git a/python/lib/sift_internal/__init__.py b/python/lib/sift_internal/__init__.py index 6ace3401..6c7a07d0 100644 --- a/python/lib/sift_internal/__init__.py +++ b/python/lib/sift_internal/__init__.py @@ -1,5 +1,5 @@ """ INTERNAL MODULE -Everything inside of this module is not meant to be used publicly. Proceed at your own risk. +Everything inside of this module is not meant to be used publicly. """ diff --git a/python/lib/sift_internal/convert/json.py b/python/lib/sift_internal/convert/json.py new file mode 100644 index 00000000..96d60372 --- /dev/null +++ b/python/lib/sift_internal/convert/json.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import json +from abc import ABC, abstractmethod +from typing import Any + + +class AsJson(ABC): + """ + Utility sub-types that require custom-serialization meant to be used in conjunction with the + `to_json` function. Sub-types should implement `as_json` which should return the object that + you want passed to `json.dumps`. + """ + + @abstractmethod + def as_json(self) -> Any: + pass + + +def to_json(value: Any) -> str: + """ + Serializes `value` to a JSON string uses the `AsJson.as_json` implementation of the type. + """ + return json.dumps(value, default=lambda x: x.as_json()) diff --git a/python/lib/sift_internal/convert/protobuf.py b/python/lib/sift_internal/convert/protobuf.py index 0d02ff78..f46d4293 100644 --- a/python/lib/sift_internal/convert/protobuf.py +++ b/python/lib/sift_internal/convert/protobuf.py @@ -1,35 +1,34 @@ +from __future__ import annotations + from abc import ABC, abstractmethod +from typing import Generic, Type, TypeVar + from google.protobuf.message import Message -from typing import cast, Optional, Type, TypeVar ProtobufMessage = Message +T = TypeVar("T", bound=ProtobufMessage) + -class AsProtobuf(ABC): +class AsProtobuf(ABC, Generic[T]): """ Abstract base class used to create create sub-types that can be treated as an object that can be converted into an instance of `ProtobufMessage`. + + If there are multiple possible protobuf targets then `as_pb` may be overloaded. """ @abstractmethod - def as_pb(self, klass: Type[ProtobufMessage]) -> Optional[ProtobufMessage]: + def as_pb(self, klass: Type[T]) -> T: """ - Performs the conversion into a sub-type of `ProtobufMessage`. Should return `None` - if conversion fails. + Performs the conversion into a sub-type of `ProtobufMessage`. """ pass - -T = TypeVar("T", bound=ProtobufMessage) - - -def try_cast_pb(value: AsProtobuf, target_klass: Type[T]) -> T: - """ - Tries to cast the `value` to `target_klass`, otherwise, returns a `TypeError`. - """ - value_pb = value.as_pb(target_klass) - if isinstance(value_pb, target_klass): - return cast(target_klass, value_pb) - raise TypeError( - f"Expected a '{target_klass.__module__}{target_klass.__name__}' but got {value.__module__}{value.__class__.__name__}" - ) + @classmethod + @abstractmethod + def from_pb(cls, message: Type[T]) -> T: + """ + Converts a protobuf object to the type of the sub-class class. + """ + pass diff --git a/python/lib/sift_py/rule/config.py b/python/lib/sift_internal/test_util/__init__.py similarity index 100% rename from python/lib/sift_py/rule/config.py rename to python/lib/sift_internal/test_util/__init__.py diff --git a/python/lib/sift_internal/test_util/channel.py b/python/lib/sift_internal/test_util/channel.py new file mode 100644 index 00000000..f6aadb4c --- /dev/null +++ b/python/lib/sift_internal/test_util/channel.py @@ -0,0 +1,66 @@ +from grpc_testing import Channel + + +class MockChannel(Channel): + def take_unary_unary(self, method_descriptor): + pass + + def take_unary_stream(self, method_descriptor): + pass + + def take_stream_unary(self, method_descriptor): + pass + + def take_stream_stream(self, method_descriptor): + pass + + def subscribe(self, callback, try_to_connect=False): + pass + + def unsubscribe(self, callback): + pass + + def unary_unary( + self, + method, + request_serializer=None, + response_deserializer=None, + _registered_method=False, + ): + pass + + def unary_stream( + self, + method, + request_serializer=None, + response_deserializer=None, + _registered_method=False, + ): + pass + + def stream_unary( + self, + method, + request_serializer=None, + response_deserializer=None, + _registered_method=False, + ): + pass + + def stream_stream( + self, + method, + request_serializer=None, + response_deserializer=None, + _registered_method=False, + ): + pass + + def close(self): + pass + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + pass diff --git a/python/lib/sift_internal/types.py b/python/lib/sift_internal/types.py deleted file mode 100644 index a761e6c6..00000000 --- a/python/lib/sift_internal/types.py +++ /dev/null @@ -1,14 +0,0 @@ -from typing import Any, Optional, Type, TypeVar - -T = TypeVar("T") - - -def any_as(value: Any, target_klass: Type[T]) -> Optional[T]: - """ - Attempts to convert `value` of type `Any` to `target_klass`, otherwise return `None`. - """ - - if isinstance(value, target_klass): - return value - else: - return None diff --git a/python/lib/sift_py/grpc/interceptors.py b/python/lib/sift_py/grpc/interceptors.py index 3e7eb358..64c2ccaf 100644 --- a/python/lib/sift_py/grpc/interceptors.py +++ b/python/lib/sift_py/grpc/interceptors.py @@ -8,10 +8,11 @@ """ from __future__ import annotations -from grpc_interceptor import ClientInterceptor, ClientCallDetails -from typing import Any, Callable, cast, List, Tuple + +from typing import Any, Callable, List, Tuple, cast import grpc +from grpc_interceptor import ClientCallDetails, ClientInterceptor Metadata = List[Tuple[str, str]] diff --git a/python/lib/sift_py/grpc/transport.py b/python/lib/sift_py/grpc/transport.py index c38c0861..fb2194ae 100644 --- a/python/lib/sift_py/grpc/transport.py +++ b/python/lib/sift_py/grpc/transport.py @@ -5,11 +5,13 @@ """ from __future__ import annotations -from grpc_interceptor import ClientInterceptor -from .interceptors import Metadata, MetadataInterceptor + from typing import List, TypedDict import grpc +from grpc_interceptor import ClientInterceptor + +from .interceptors import Metadata, MetadataInterceptor SiftChannel = grpc.Channel @@ -48,8 +50,9 @@ def _metadata_interceptor(config: SiftChannelConfig) -> ClientInterceptor: """ Any new metadata goes here. """ + apikey = config["apikey"] metadata: Metadata = [ - ("authorization", f"Bearer {config["apikey"]}"), + ("authorization", f"Bearer {apikey}"), ] return MetadataInterceptor(metadata) @@ -59,8 +62,10 @@ class SiftChannelConfig(TypedDict): Config class used to instantiate a `SiftChannel` via `use_sift_channel`. Attributes: - uri: The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted. - apikey: User-generated API key generated via the Sift application. + `uri`: + The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted. + `apikey`: + User-generated API key generated via the Sift application. """ uri: str diff --git a/python/lib/sift_py/ingestion/channel.py b/python/lib/sift_py/ingestion/channel.py index f044d4be..bac4d56a 100644 --- a/python/lib/sift_py/ingestion/channel.py +++ b/python/lib/sift_py/ingestion/channel.py @@ -1,20 +1,21 @@ from __future__ import annotations -from google.protobuf.empty_pb2 import Empty -from sift_internal.convert.protobuf import AsProtobuf, ProtobufMessage, try_cast_pb + from enum import Enum -from sift.common.type.v1.channel_enum_type_pb2 import ( - ChannelEnumType as ChannelEnumTypePb, -) +from typing import List, Optional, Type, TypedDict + +import sift.common.type.v1.channel_data_type_pb2 as channel_pb +from google.protobuf.empty_pb2 import Empty +from sift.channels.v2.channels_pb2 import Channel as ChannelPb from sift.common.type.v1.channel_bit_field_element_pb2 import ( ChannelBitFieldElement as ChannelBitFieldElementPb, ) -from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataChannelValue -from sift.ingestion_configs.v1.ingestion_configs_pb2 import ( - ChannelConfig as ChannelConfigPb, +from sift.common.type.v1.channel_enum_type_pb2 import ( + ChannelEnumType as ChannelEnumTypePb, ) -from typing import List, Optional, Type, TypedDict, Union - -import sift.common.type.v1.channel_data_type_pb2 as channel_pb +from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataChannelValue +from sift.ingestion_configs.v1.ingestion_configs_pb2 import ChannelConfig as ChannelConfigPb +from sift_internal.convert.protobuf import AsProtobuf +from typing_extensions import NotRequired, Self class ChannelValue(TypedDict): @@ -23,7 +24,7 @@ class ChannelValue(TypedDict): """ channel_name: str - component: Union[str, None] + component: NotRequired[str] value: IngestWithConfigDataChannelValue @@ -58,19 +59,40 @@ def __init__( self.bit_field_elements = bit_field_elements self.enum_types = enum_types - def as_pb(self, klass: Type[ProtobufMessage]) -> Optional[ProtobufMessage]: - return ChannelConfigPb( + def as_pb(self, klass: Type[ChannelConfigPb]) -> ChannelConfigPb: + return klass( name=self.name, component=self.component or "", unit=self.unit or "", description=self.description or "", data_type=self.data_type.value, - enum_types=[try_cast_pb(etype, ChannelEnumTypePb) for etype in self.enum_types], + enum_types=[etype.as_pb(ChannelEnumTypePb) for etype in self.enum_types], bit_field_elements=[ - try_cast_pb(el, ChannelBitFieldElementPb) for el in self.bit_field_elements + el.as_pb(ChannelBitFieldElementPb) for el in self.bit_field_elements ], ) + @classmethod + def from_pb(cls, message: ChannelConfigPb) -> Self: + return cls( + name=message.name, + data_type=ChannelDataType.from_pb(message.data_type), + description=message.description, + unit=message.unit, + component=message.component, + bit_field_elements=[ + ChannelBitFieldElement.from_pb(el) for el in message.bit_field_elements + ], + enum_types=[ChannelEnumType.from_pb(etype) for etype in message.enum_types], + ) + + def fqn(self) -> str: + """ + The fully-qualified channel name of a channel called 'voltage' is simply `voltage'. The + fully qualified name of a channel called 'temperature' of component 'motor' is a `motor.temperature'. + """ + return channel_fqn(self) + class ChannelBitFieldElement(AsProtobuf): name: str @@ -82,13 +104,21 @@ def __init__(self, name: str, index: int, bit_count: int): self.index = index self.bit_count = bit_count - def as_pb(self, klass: Type[ProtobufMessage]) -> Optional[ProtobufMessage]: - return ChannelBitFieldElementPb( + def as_pb(self, klass: Type[ChannelBitFieldElementPb]) -> ChannelBitFieldElementPb: + return klass( name=self.name, index=self.index, bit_count=self.bit_count, ) + @classmethod + def from_pb(cls, message: ChannelBitFieldElementPb) -> Self: + return cls( + name=message.name, + index=message.index, + bit_count=message.bit_count, + ) + class ChannelEnumType(AsProtobuf): name: str @@ -98,8 +128,12 @@ def __init__(self, name: str, key: int): self.name = name self.key = key - def as_pb(self, klass: Type[ProtobufMessage]) -> Optional[ProtobufMessage]: - return ChannelEnumTypePb(name=self.name, key=self.key) + def as_pb(self, klass: Type[ChannelEnumTypePb]) -> ChannelEnumTypePb: + return klass(name=self.name, key=self.key) + + @classmethod + def from_pb(cls, message: ChannelEnumTypePb) -> Self: + return cls(name=message.name, key=message.key) class ChannelDataType(Enum): @@ -118,32 +152,99 @@ class ChannelDataType(Enum): UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32 UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64 + @classmethod + def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType": + if val == cls.DOUBLE: + return cls.DOUBLE + elif val == cls.STRING: + return cls.STRING + elif val == cls.ENUM: + return cls.ENUM + elif val == cls.BIT_FIELD: + return cls.BIT_FIELD + elif val == cls.BOOL: + return cls.BOOL + elif val == cls.FLOAT: + return cls.FLOAT + elif val == cls.INT_32: + return cls.INT_32 + elif val == cls.INT_64: + return cls.INT_64 + elif val == cls.UINT_32: + return cls.UINT_32 + elif val == cls.UINT_64: + return cls.UINT_64 + else: + raise ValueError(f"Unknown channel data type '{val}'.") + @classmethod def from_str(cls, val: str) -> Optional["ChannelDataType"]: - if val == "CHANNEL_DATA_TYPE_DOUBLE": + val = val.strip() + + if val == "CHANNEL_DATA_TYPE_DOUBLE" or val == ChannelDataTypeStrRep.DOUBLE.value: return cls.DOUBLE - elif val == "CHANNEL_DATA_TYPE_STRING": + elif val == "CHANNEL_DATA_TYPE_STRING" or val == ChannelDataTypeStrRep.STRING.value: return cls.STRING - elif val == "CHANNEL_DATA_TYPE_ENUM": + elif val == "CHANNEL_DATA_TYPE_ENUM" or val == ChannelDataTypeStrRep.ENUM.value: return cls.ENUM - elif val == "CHANNEL_DATA_TYPE_BIT_FIELD": + elif val == "CHANNEL_DATA_TYPE_BIT_FIELD" or val == ChannelDataTypeStrRep.BIT_FIELD.value: return cls.BIT_FIELD - elif val == "CHANNEL_DATA_TYPE_BOOL": + elif val == "CHANNEL_DATA_TYPE_BOOL" or val == ChannelDataTypeStrRep.BOOL.value: return cls.BOOL - elif val == "CHANNEL_DATA_TYPE_FLOAT": + elif val == "CHANNEL_DATA_TYPE_FLOAT" or val == ChannelDataTypeStrRep.FLOAT.value: return cls.FLOAT - elif val == "CHANNEL_DATA_TYPE_INT_32": + elif val == "CHANNEL_DATA_TYPE_INT_32" or val == ChannelDataTypeStrRep.INT_32.value: return cls.INT_32 - elif val == "CHANNEL_DATA_TYPE_INT_64": + elif val == "CHANNEL_DATA_TYPE_INT_64" or val == ChannelDataTypeStrRep.INT_64.value: return cls.INT_64 - elif val == "CHANNEL_DATA_TYPE_UINT_32": + elif val == "CHANNEL_DATA_TYPE_UINT_32" or val == ChannelDataTypeStrRep.UINT_32.value: return cls.UINT_32 - elif val == "CHANNEL_DATA_TYPE_UINT_64": + elif val == "CHANNEL_DATA_TYPE_UINT_64" or val == ChannelDataTypeStrRep.UINT_64.value: return cls.UINT_64 return None +class ChannelDataTypeStrRep(Enum): + DOUBLE = "double" + STRING = "string" + ENUM = "enum" + BIT_FIELD = "bit_field" + BOOL = "bool" + FLOAT = "float" + INT_32 = "int32" + INT_64 = "int64" + UINT_32 = "uint32" + UINT_64 = "uint64" + + +def channel_fqn(channel: ChannelConfig | ChannelValue | ChannelPb) -> str: + """ + Computes the fully qualified channel name. + + The fully-qualified channel name of a channel called 'voltage' is simply `voltage'. The + fully qualified name of a channel called 'temperature' of component 'motor' is a `motor.temperature'. + """ + + if isinstance(channel, ChannelConfig): + if channel.component is None or len(channel.component) == "": + return channel.name + else: + return f"{channel.component}.{channel.name}" + elif isinstance(channel, ChannelPb): + if channel.component is None or len(channel.component) == "": + return channel.name + else: + return f"{channel.component}.{channel.name}" + else: + component = channel.get("component", "") + channel_name = channel["channel_name"] + if len(component) == 0: + return channel_name + else: + return f"{component}.{channel_name}" + + def string_value(val: str) -> IngestWithConfigDataChannelValue: return IngestWithConfigDataChannelValue(string=val) diff --git a/python/lib/sift_py/ingestion/config/__init__.py b/python/lib/sift_py/ingestion/config/__init__.py index 517431d7..0e2a36c8 100644 --- a/python/lib/sift_py/ingestion/config/__init__.py +++ b/python/lib/sift_py/ingestion/config/__init__.py @@ -1,43 +1,3 @@ """ Contains the in memory representation of a telemetry config used to configure ingestion. """ - -from __future__ import annotations -from ..flow import FlowConfig -from typing import List, Optional - - -class TelemetryConfig: - """ - Configurations necessary to start ingestion. - - Attributes: - asset_name: The name of the asset that you wish to telemeter data for. - ingestion_client_key: An arbitrary string completely chosen by the user to uniquely identify - this ingestion configuration. It should be unique with respect to your - organization. - - flows: The list of `FlowConfig`. A single flow can specify a single channel value - or a set of channel values, with each value belonging to a different channel. Channels - that send data at the same frequency and time should be in the same flow. - - organization_id: ID of your organization in Sift. This field is only required if your user - belongs to multiple organizations - """ - - asset_name: str - ingestion_client_key: str - organization_id: Optional[str] - flows: List[FlowConfig] - - def __init__( - self, - asset_name: str, - ingestion_client_key: str, - organization_id: Optional[str] = None, - flows: List[FlowConfig] = [], - ): - self.asset_name = asset_name - self.ingestion_client_key = ingestion_client_key - self.organization_id = organization_id - self.flows = flows diff --git a/python/lib/sift_py/ingestion/config/telemetry.py b/python/lib/sift_py/ingestion/config/telemetry.py new file mode 100644 index 00000000..5dbe014c --- /dev/null +++ b/python/lib/sift_py/ingestion/config/telemetry.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +from typing import List, Optional + +from sift_py.ingestion.flow import FlowConfig +from sift_py.ingestion.rule.config import RuleConfig + + +class TelemetryConfig: + """ + Configurations necessary to start ingestion. + + Attributes: + `asset_name`: + The name of the asset that you wish to telemeter data for. + `ingestion_client_key`: + An arbitrary string chosen by the user to uniquely identify this ingestion configuration. + `flows`: + A single flow can specify a single channel value or a set of channel values that are ingested together. + `organization_id`: + ID of your organization in Sift. This field is only required if your user belongs to multiple organizations. + `rules`: + Rules to evaluate during ingestion. + """ + + asset_name: str + ingestion_client_key: str + organization_id: Optional[str] + flows: List[FlowConfig] + rules: List[RuleConfig] + + def __init__( + self, + asset_name: str, + ingestion_client_key: str, + organization_id: Optional[str] = None, + flows: List[FlowConfig] = [], + rules: List[RuleConfig] = [], + ): + """ + Will raise a `TelemetryConfigValidationError` under the following conditions: + - Multiple flows with the same name + - Multiple rules with the same name + - Identical channels in the same flow + """ + self.__class__.validate_flows(flows) + self.__class__.validate_rules(rules) + + self.asset_name = asset_name + self.ingestion_client_key = ingestion_client_key + self.organization_id = organization_id + self.flows = flows + self.rules = rules + + @staticmethod + def validate_rules(rules: List[RuleConfig]): + """ + Ensure that there are no rules with identical names + """ + seen_rule_names = set() + + for rule in rules: + if rule.name in seen_rule_names: + raise TelemetryConfigValidationError( + f"Can't have two rules with identical names, '{rule.name}'." + ) + seen_rule_names.add(rule.name) + + @staticmethod + def validate_flows(flows: List[FlowConfig]): + """ + Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception. + """ + flow_names = set() + + for flow in flows: + seen_channels = set() + + if flow.name in flow_names: + raise TelemetryConfigValidationError( + f"Can't have two flows with the same name, '{flow.name}'." + ) + + flow_names.add(flow.name) + + for channel in flow.channels: + fqn = channel.fqn() + + if fqn in seen_channels: + raise TelemetryConfigValidationError( + f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'." + ) + else: + seen_channels.add(fqn) + + +class TelemetryConfigValidationError(Exception): + """ + When the telemetry config has invalid properties + """ + + message: str + + def __init__(self, message: str): + super().__init__(message) diff --git a/python/lib/sift_py/ingestion/config/telemetry_test.py b/python/lib/sift_py/ingestion/config/telemetry_test.py new file mode 100644 index 00000000..6398ae6b --- /dev/null +++ b/python/lib/sift_py/ingestion/config/telemetry_test.py @@ -0,0 +1,104 @@ +import pytest +from sift_py.ingestion.channel import ChannelConfig, ChannelDataType +from sift_py.ingestion.config.telemetry import TelemetryConfig, TelemetryConfigValidationError +from sift_py.ingestion.flow import FlowConfig +from sift_py.ingestion.rule.config import RuleActionCreateDataReviewAnnotation, RuleConfig + + +def test_telemetry_config_validations_duplicate_rules(): + channel = ChannelConfig( + name="my_channel", + data_type=ChannelDataType.DOUBLE, + ) + + rule_on_my_channel_a = RuleConfig( + name="rule_a", + description="", + expression="$1 > 10", + channel_references={ + "$1": channel, + }, + action=RuleActionCreateDataReviewAnnotation( + assignee="bob@example.com", + tags=["barometer"], + ), + ) + + another_rule_on_my_channel_a = RuleConfig( + name="rule_a", # same name + description="", + expression="$1 > 11", + channel_references={ + "$1": channel, + }, + action=RuleActionCreateDataReviewAnnotation( + assignee="bob@example.com", + tags=["barometer"], + ), + ) + + with pytest.raises(TelemetryConfigValidationError, match="Can't have two rules"): + TelemetryConfig( + asset_name="my_asset", + ingestion_client_key="my_asset_key", + organization_id="my_organization_id", + flows=[ + FlowConfig( + name="my_flow", + channels=[channel], + ) + ], + rules=[rule_on_my_channel_a, another_rule_on_my_channel_a], + ) + + +def test_telemetry_config_validations_duplicate_channels(): + channel = ChannelConfig( + name="my_channel", + data_type=ChannelDataType.DOUBLE, + ) + + with pytest.raises(TelemetryConfigValidationError, match="Can't have two identical channels"): + TelemetryConfig( + asset_name="my_asset", + ingestion_client_key="my_asset_key", + organization_id="my_organization_id", + flows=[ + FlowConfig( + name="my_flow", + channels=[ + channel, + channel, + ], + ) + ], + ) + + +def test_telemetry_config_validations_flows_with_same_name(): + channel = ChannelConfig( + name="my_channel", + data_type=ChannelDataType.DOUBLE, + ) + + channel_b = ChannelConfig( + name="my_other_channel", + data_type=ChannelDataType.DOUBLE, + ) + + with pytest.raises(TelemetryConfigValidationError, match="Can't have two flows"): + TelemetryConfig( + asset_name="my_asset", + ingestion_client_key="my_asset_key", + organization_id="my_organization_id", + flows=[ + FlowConfig( + name="my_flow", + channels=[channel], + ), + FlowConfig( + name="my_flow", + channels=[channel_b], + ), + ], + ) diff --git a/python/lib/sift_py/ingestion/config/yaml.py b/python/lib/sift_py/ingestion/config/yaml.py index 476985f6..d268a681 100644 --- a/python/lib/sift_py/ingestion/config/yaml.py +++ b/python/lib/sift_py/ingestion/config/yaml.py @@ -1,16 +1,161 @@ from __future__ import annotations -from ..channel import ChannelDataType, ChannelBitFieldElement, ChannelEnumType -from ..error import YamlConfigError -from ..flow import ChannelConfig, FlowConfig + +from collections.abc import Iterable from pathlib import Path -from sift_internal.types import any_as -from typing import Any, Dict, List -from . import TelemetryConfig +from typing import Dict, List, Literal, Optional, TypedDict, cast import yaml +from sift_py.ingestion.channel import ( + ChannelBitFieldElement, + ChannelDataType, + ChannelEnumType, + channel_fqn, +) +from sift_py.ingestion.flow import ChannelConfig, FlowConfig +from sift_py.ingestion.rule.config import ( + RuleActionAnnotationKind, + RuleActionCreateDataReviewAnnotation, + RuleActionCreatePhaseAnnotation, + RuleConfig, +) +from typing_extensions import NotRequired + +from .telemetry import TelemetryConfig + + +class TelemetryConfigYamlSpec(TypedDict): + """ + Formal spec that defines what the telemetry config should look like in YAML. + """ + + asset_name: str + ingestion_client_key: str + organization_id: NotRequired[str] + channels: Dict[str, ChannelConfigYamlSpec] + rules: NotRequired[List[RuleYamlSpec]] + flows: NotRequired[List[FlowYamlSpec]] + + +class ChannelConfigYamlSpec(TypedDict): + """ + Formal spec that defines what a channel should look like in YAML. + """ + + name: str + description: NotRequired[str] + unit: NotRequired[str] + component: NotRequired[str] + data_type: ( + Literal["double"] + | Literal["string"] + | Literal["enum"] + | Literal["bit_field"] + | Literal["bool"] + | Literal["float"] + | Literal["int32"] + | Literal["int64"] + | Literal["uint32"] + | Literal["uint64"] + ) + enum_types: NotRequired[List[ChannelEnumTypeYamlSpec]] + bit_field_elements: NotRequired[List[ChannelBitFieldElementYamlSpec]] + + +class ChannelEnumTypeYamlSpec(TypedDict): + """ + Formal spec that defines what a channel enum type should look like in YAML. + """ + + name: str + key: int + + +class ChannelBitFieldElementYamlSpec(TypedDict): + """ + Formal spec that defines what a bit-field element should look like in YAML. + """ + + name: str + index: int + bit_count: int + + +class FlowYamlSpec(TypedDict): + """ + Formal spec that defines what a flow should look like in YAML. + """ + + name: str + channels: List[ChannelConfigYamlSpec] + + +class YamlLoadOptions(TypedDict): + """ + Options to use when loading a telemetry config form YAML. + + Attributes: + `named_expressions`: + A list of look up paths for YAML files containing named expressions. Could also just be a YAML str. + """ + + named_expressions: List[Path | str] + + +class RulesYamlSpec(TypedDict): + rules: List[RuleYamlSpec] + + +class RuleYamlSpec(TypedDict): + """ + The formal definition of what a single rule looks like in YAML. + """ + + name: str + description: NotRequired[str] + expression: str | NamedExpressionYamlSpec + type: Literal["phase"] | Literal["review"] + assignee: NotRequired[str] + tags: NotRequired[List[str]] + channel_references: NotRequired[List[Dict[str, ChannelConfigYamlSpec]]] + sub_expressions: NotRequired[List[Dict[str, str]]] -def try_load_from_yaml(config_fs_path: Path) -> TelemetryConfig: +class NamedExpressionYamlSpec(TypedDict): + """ + A named, reusable expression. This class is the formal definition + of what a named expression should look like in YAML. + """ + + name: str + + +""" +NamedExpressionsYamlSpec is a type alias for a dictionary where both keys and values are strings. +Note the pluralization in the name to distinguish it from `NamedExpressionYamlSpec`. + +This alias serves as a formal definition for a YAML file that solely contains named expressions. +See `sift_py.ingestion.rule.yaml_test.py` for examples. + +Named expressions are generic expressions that contain placeholders instead of identifiers. They can +be loaded at runtime and referenced in telemetry configs to facilitate reuse. +""" +NamedExpressionsYamlSpec = Dict[str, str] + + +class YamlConfigError(Exception): + """ + When the YAML config has missing or invalid properties. + """ + + message: str + + def __init__(self, message: str): + super().__init__(message) + + +def try_load_from_yaml( + config_fs_path: Path, opts: Optional[YamlLoadOptions] = None +) -> TelemetryConfig: """ Loads in YAML config file and deserializes it into an instance of `TelemetryConfig`. If the YAML config has any malformed or missing properties than a `YamlConfigError` is raised. @@ -22,127 +167,284 @@ def try_load_from_yaml(config_fs_path: Path) -> TelemetryConfig: with open(config_fs_path, "r") as file: content = file.read() - return _try_from_yaml_str(content) + return _try_from_yaml_str(content, opts) + + +def rule_config_from_yaml( + rule_yaml: RuleYamlSpec, + named_expressions: Dict[str, str] = {}, +) -> RuleConfig: + """ + Creates a `RuleConfig` from a `rule_yaml` and an optional `named_expressions` dictionary + if generic named expressions are used. + """ + + rule_name = rule_yaml.get("name") + if rule_name is None or len(rule_name) == 0: + raise YamlConfigError("Expected rule to have a 'name' property.") + + description = rule_yaml.get("description") or "" + + raw_annotation_type = rule_yaml.get("type") + if raw_annotation_type is None: + raise YamlConfigError(f"Expected ruled '{rule_name} to have a 'type' property.") + + annotation_type = RuleActionAnnotationKind.from_str(raw_annotation_type) + + expression = rule_yaml.get("expression") + + if expression is None: + raise YamlConfigError(f"Expected rule '{rule_name}' to have an expression.") + + raw_channel_references = rule_yaml.get("channel_references", []) + channel_references = {} + for raw_channel_reference in raw_channel_references: + for reference, channel_config in raw_channel_reference.items(): + channel_references[reference] = _deserialize_channel_from_yaml(channel_config) + + raw_sub_expressions = rule_yaml.get("sub_expressions", []) + sub_expressions = {} + for raw_sub_expression in raw_sub_expressions: + for reference, value in raw_sub_expression.items(): + sub_expressions[reference] = value + + if isinstance(expression, str): + if annotation_type == RuleActionAnnotationKind.REVIEW: + return RuleConfig( + name=rule_name, + description=description, + expression=expression, + action=RuleActionCreateDataReviewAnnotation( + assignee=rule_yaml.get("assignee"), + tags=rule_yaml.get("tags"), + ), + channel_references=channel_references, + sub_expressions=sub_expressions, + ) + else: + return RuleConfig( + name=rule_name, + description=description, + expression=expression, + action=RuleActionCreatePhaseAnnotation( + tags=rule_yaml.get("tags"), + ), + channel_references=channel_references, + sub_expressions=sub_expressions, + ) + elif isinstance(expression, dict): + expression_name = expression.get("name") + if expression_name is None: + raise YamlConfigError("Expected named expression to have a 'name' property.") + + named_expression = named_expressions.get(expression_name) + if named_expression is None: + raise YamlConfigError( + f"Failed to find named expression '{expression_name}' for rule '{rule_name}'." + ) + + if annotation_type == RuleActionAnnotationKind.REVIEW: + return RuleConfig( + name=rule_name, + description=description, + expression=named_expression, + action=RuleActionCreateDataReviewAnnotation( + assignee=rule_yaml.get("assignee"), + tags=rule_yaml.get("tags"), + ), + channel_references=channel_references, + sub_expressions=sub_expressions, + ) + else: + return RuleConfig( + name=rule_name, + description=description, + expression=named_expression, + action=RuleActionCreatePhaseAnnotation( + tags=rule_yaml.get("tags"), + ), + channel_references=channel_references, + sub_expressions=sub_expressions, + ) + else: + raise YamlConfigError( + f"Expected rule '{rule_name}' 'expression' property to be a string or have properties." + ) + + +def try_load_named_expressions_from_yaml( + named_expressions_fs_path: Path, +) -> NamedExpressionsYamlSpec: + """ + Loads in named expressions from a file. + """ + + suffix = named_expressions_fs_path.suffix + if suffix != ".yaml" and suffix != ".yml": + raise YamlConfigError(f"Unsupported file-type '{suffix}', expected YAML.") + + with open(named_expressions_fs_path, "r") as file: + content = file.read() + return cast(NamedExpressionsYamlSpec, yaml.safe_load(content)) -def _try_from_yaml_str(yaml_str: str) -> TelemetryConfig: - config: Dict[Any, Any] = yaml.safe_load(yaml_str) +def _try_from_yaml_str(yaml_str: str, opts: Optional[YamlLoadOptions] = None) -> TelemetryConfig: + config: TelemetryConfigYamlSpec = yaml.safe_load(yaml_str) - asset_name = any_as(config.get("asset_name"), str) + asset_name = config.get("asset_name") if asset_name is None or len(asset_name) == 0: - raise YamlConfigError("Expected a non-blank string for top-level 'asset_name' property") + raise YamlConfigError("Expected a non-blank string for top-level 'asset_name' property.") - ingestion_client_key = any_as(config.get("ingestion_client_key"), str) + ingestion_client_key = config.get("ingestion_client_key") if ingestion_client_key is None or len(ingestion_client_key) == 0: raise YamlConfigError( - "Expected a non-blank string top-level 'ingestion_client_key' property" + "Expected a non-blank string top-level 'ingestion_client_key' property." ) - organization_id = any_as(config.get("organization_id"), str) + organization_id = config.get("organization_id") + + raw_channels = config.get("channels") + if raw_channels is None or len(raw_channels) == 0: + raise YamlConfigError("Expected a top-level non-empty 'channels' property.") - # TODO... parse channels top-level first before flows then assign to flows. + channels = [_deserialize_channel_from_yaml(c) for c in raw_channels.values()] + channels_by_fqn = {channel_fqn(c): c for c in channels} - raw_flows = any_as(config.get("flows"), list) + raw_flows = config.get("flows") if raw_flows is None: - raise YamlConfigError("Expected 'flows' to be a list property") + raise YamlConfigError("Expected 'flows' to be a list property.") + + named_expressions = {} + if opts is not None: + for named_expr in opts.get("named_expressions", []): + named_expressions_from_yaml = {} + + if isinstance(named_expr, str): + named_expressions_from_yaml = cast( + NamedExpressionsYamlSpec, yaml.safe_load(named_expr) + ) + else: + named_expressions_from_yaml = try_load_named_expressions_from_yaml(named_expr) + + for name, expression in named_expressions_from_yaml.items(): + if name in named_expressions: + raise YamlConfigError( + f"Found multiple named expressions with the name '{name}'." + ) + named_expressions[name] = expression + + raw_rules = config.get("rules") + rules = [] + if raw_rules is not None and len(raw_rules) > 0: + for raw_rule in raw_rules: + rule = rule_config_from_yaml(raw_rule, named_expressions) + rules.append(rule) return TelemetryConfig( asset_name=asset_name, ingestion_client_key=ingestion_client_key, organization_id=organization_id, - flows=_deserialize_flows_from_yaml(raw_flows), + flows=_deserialize_flows_from_yaml(raw_flows, channels_by_fqn), + rules=rules, ) -def _deserialize_flows_from_yaml(raw_flow_configs: List[Dict]) -> List[FlowConfig]: +def _deserialize_flows_from_yaml( + raw_flow_configs: Iterable[FlowYamlSpec], + channels_by_fqn: Dict[str, ChannelConfig], +) -> List[FlowConfig]: flow_configs = [] for raw_flow_config in raw_flow_configs: - flow_name = any_as(raw_flow_config.get("name"), str) + flow_name = raw_flow_config.get("name") if flow_name is None or len(flow_name) == 0: raise YamlConfigError("Expected flow to have a non-blank 'name' property") - raw_channel_configs = any_as(raw_flow_config.get("channels"), list) + raw_channel_configs = raw_flow_config.get("channels") if raw_channel_configs is None: raise YamlConfigError("Expected 'channels' to be a list property") - flow_config = FlowConfig( - name=flow_name, - channels=_deserialize_channels_from_yaml(raw_channel_configs), - ) - + channels = [_deserialize_channel_from_yaml(c) for c in raw_channel_configs] + seen_channels = set() + + for channel in channels: + fqn = channel_fqn(channel) + if fqn not in channels_by_fqn: + raise YamlConfigError( + f"Flow '{flow_name}' contains channel '{fqn}' that is missing from top-level 'channels' property." + ) + if fqn in seen_channels: + raise YamlConfigError( + f"Channel '{fqn}' cannot appear more than once for flow '{flow_name}'." + ) + seen_channels.add(fqn) + + flow_config = FlowConfig(name=flow_name, channels=channels) flow_configs.append(flow_config) return flow_configs -def _deserialize_channels_from_yaml( - raw_channel_configs: List[Dict], -) -> List[ChannelConfig]: - channel_configs = [] - - for raw_channel_config in raw_channel_configs: - channel_name = any_as(raw_channel_config.get("name"), str) - if channel_name is None or len(channel_name) == 0: - raise YamlConfigError("Expected channel to have a non-blank 'name' property") - - channel_data_type_str = any_as(raw_channel_config.get("data_type"), str) - if channel_data_type_str is None or len(channel_data_type_str) == 0: - raise YamlConfigError("Missing property for 'flows.channel.data_type' property") - - channel_data_type = ChannelDataType.from_str(channel_data_type_str) - if channel_data_type is None: - raise YamlConfigError("Invalid property for 'flows.channel.data_type' property") - - description = any_as(raw_channel_config.get("description"), str) - unit = any_as(raw_channel_config.get("unit"), str) - component = any_as(raw_channel_config.get("component"), str) - - bit_field_elements = [] - raw_bit_field_elements = any_as(raw_channel_config.get("bit_field_elements"), list) - if raw_bit_field_elements is not None: - for element in raw_bit_field_elements: - el = _deserialize_bit_field_element_from_yaml(element) - bit_field_elements.append(el) - - enum_types = [] - raw_enum_types = any_as(raw_channel_config.get("enum_types"), list) - if raw_enum_types is not None: - for enum_type in raw_enum_types: - etype = _deserialize_enum_type_from_yaml(enum_type) - enum_types.append(etype) - - channel_config = ChannelConfig( - name=channel_name, - data_type=channel_data_type, - description=description, - unit=unit, - component=component, - bit_field_elements=bit_field_elements, - enum_types=enum_types, - ) - - channel_configs.append(channel_config) - - return channel_configs +def _deserialize_channel_from_yaml( + raw_channel_config: ChannelConfigYamlSpec, +) -> ChannelConfig: + channel_name = raw_channel_config.get("name") + if channel_name is None or len(channel_name) == 0: + raise YamlConfigError("Expected channel to have a non-blank 'name' property") + + channel_data_type_str = raw_channel_config.get("data_type") + if channel_data_type_str is None or len(channel_data_type_str) == 0: + raise YamlConfigError("Missing property for 'flows.channel.data_type' property") + + channel_data_type = ChannelDataType.from_str(channel_data_type_str) + if channel_data_type is None: + raise YamlConfigError("Invalid property for 'flows.channel.data_type' property") + + description = raw_channel_config.get("description") + unit = raw_channel_config.get("unit") + component = raw_channel_config.get("component") + + bit_field_elements = [] + raw_bit_field_elements = raw_channel_config.get("bit_field_elements") + if raw_bit_field_elements is not None: + for element in raw_bit_field_elements: + el = _deserialize_bit_field_element_from_yaml(element) + bit_field_elements.append(el) + + enum_types = [] + raw_enum_types = raw_channel_config.get("enum_types") + if raw_enum_types is not None: + for enum_type in raw_enum_types: + etype = _deserialize_enum_type_from_yaml(enum_type) + enum_types.append(etype) + + return ChannelConfig( + name=channel_name, + data_type=channel_data_type, + description=description, + unit=unit, + component=component, + bit_field_elements=bit_field_elements, + enum_types=enum_types, + ) def _deserialize_bit_field_element_from_yaml( - bit_field_element: Dict, + bit_field_element: ChannelBitFieldElementYamlSpec, ) -> ChannelBitFieldElement: - name = any_as(bit_field_element.get("name"), str) + name = bit_field_element.get("name") if name is None or len(name) == 0: raise YamlConfigError( "Expected a non-blank value for 'flows.channels.bit_field_element.name'" ) - index = any_as(bit_field_element.get("index"), int) + index = bit_field_element.get("index") if index is None: raise YamlConfigError( "Expected an integer value for 'flows.channels.bit_field_element.index'" ) - bit_count = any_as(bit_field_element.get("bit_count"), int) + bit_count = bit_field_element.get("bit_count") if bit_count is None: raise YamlConfigError( "Expected an integer value for 'flows.channels.bit_field_element.bit_count'" @@ -155,12 +457,12 @@ def _deserialize_bit_field_element_from_yaml( ) -def _deserialize_enum_type_from_yaml(enum_type: Any) -> ChannelEnumType: - name = any_as(enum_type.get("name"), str) +def _deserialize_enum_type_from_yaml(enum_type: ChannelEnumTypeYamlSpec) -> ChannelEnumType: + name = enum_type.get("name") if name is None or len(name) == 0: raise YamlConfigError("Expected a non-blank value for 'flows.channels.enum_types.name'") - key = any_as(enum_type.get("key"), int) + key = enum_type.get("key") if key is None: raise YamlConfigError("Expected an integer value for 'flows.channels.enum_types.key'") diff --git a/python/lib/sift_py/ingestion/config/yaml_test.py b/python/lib/sift_py/ingestion/config/yaml_test.py index ac43b70b..6b2a03a3 100644 --- a/python/lib/sift_py/ingestion/config/yaml_test.py +++ b/python/lib/sift_py/ingestion/config/yaml_test.py @@ -1,10 +1,26 @@ from __future__ import annotations -from .yaml import _try_from_yaml_str -from ..channel import ChannelDataType + +from typing import cast + +import pytest +import yaml +from sift_py.ingestion.channel import ChannelDataType +from sift_py.ingestion.config.yaml import ( + NamedExpressionsYamlSpec, + YamlConfigError, + _try_from_yaml_str, +) +from sift_py.ingestion.rule.config import ( + RuleActionCreateDataReviewAnnotation, + RuleActionCreatePhaseAnnotation, + RuleActionKind, +) def test_telemetry_config(): - telemetry_config = _try_from_yaml_str(TELEMETRY_CONFIG) + telemetry_config = _try_from_yaml_str( + TELEMETRY_CONFIG, {"named_expressions": [TEST_NAMED_EXPRESSIONS_YAML_STR]} + ) assert telemetry_config.asset_name == "LunarVehicle426" assert telemetry_config.ingestion_client_key == "lunar_vehicle_426" assert len(telemetry_config.flows) == 3 @@ -66,34 +82,77 @@ def test_telemetry_config(): assert gpio_channel.bit_field_elements[3].index == 7 assert gpio_channel.bit_field_elements[3].bit_count == 1 + assert len(telemetry_config.rules) == 3 + + overheating_rule, speeding_rule, failures_rule = telemetry_config.rules + + assert overheating_rule.name == "overheating" + assert overheating_rule.description == "Checks for vehicle overheating" + assert overheating_rule.expression == '$1 == "Accelerating" && $2 > 80' + assert overheating_rule.action.kind() == RuleActionKind.ANNOTATION + assert isinstance(overheating_rule.action, RuleActionCreateDataReviewAnnotation) + + assert speeding_rule.name == "speeding" + assert speeding_rule.description == "Checks high vehicle speed" + assert speeding_rule.expression == "$1 > 20" + assert overheating_rule.action.kind() == RuleActionKind.ANNOTATION + assert isinstance(speeding_rule.action, RuleActionCreatePhaseAnnotation) + + assert failures_rule.name == "failures" + assert failures_rule.description == "Checks for failure logs" + assert failures_rule.expression == 'contains($1, "ERROR")' + assert overheating_rule.action.kind() == RuleActionKind.ANNOTATION + assert isinstance(failures_rule.action, RuleActionCreateDataReviewAnnotation) + + +def test_no_duplicate_channels_telemetry_config(): + """ + Raise an error if there are duplicate channels in a flow. + """ + with pytest.raises(YamlConfigError): + _ = _try_from_yaml_str(DUPLICATE_CHANNEL_IN_FLOW_TELEMETRY_CONFIG) + + +def test_named_expressions(): + named_expressions = cast( + NamedExpressionsYamlSpec, yaml.safe_load(TEST_NAMED_EXPRESSIONS_YAML_STR) + ) + + log_substring_contains = named_expressions.get("log_substring_contains") + assert log_substring_contains is not None + assert log_substring_contains == "contains($1, $2)" + + is_even = named_expressions.get("is_even") + assert is_even is not None + assert is_even == "mod($1, 2) == 0" + TELEMETRY_CONFIG = """ ---- asset_name: LunarVehicle426 ingestion_client_key: lunar_vehicle_426 channels: log_channel: &log_channel name: log - data_type: CHANNEL_DATA_TYPE_STRING + data_type: string description: asset logs - + velocity_channel: &velocity_channel name: velocity - data_type: CHANNEL_DATA_TYPE_DOUBLE + data_type: double description: speed unit: Miles Per Hour component: mainmotor - + voltage_channel: &voltage_channel name: voltage - data_type: CHANNEL_DATA_TYPE_INT_32 + data_type: int32 description: voltage at the source unit: Volts - + vehicle_state_channel: &vehicle_state_channel name: vehicle_state - data_type: CHANNEL_DATA_TYPE_ENUM + data_type: enum description: vehicle state unit: vehicle state enum_types: @@ -103,10 +162,10 @@ def test_telemetry_config(): key: 1 - name: Stopped key: 2 - + gpio_channel: &gpio_channel name: gpio - data_type: CHANNEL_DATA_TYPE_BIT_FIELD + data_type: bit_field description: on/off values for pins on gpio bit_field_elements: - name: 12v @@ -122,6 +181,37 @@ def test_telemetry_config(): index: 7 bit_count: 1 +rules: + - name: overheating + description: Checks for vehicle overheating + expression: $1 == "Accelerating" && $2 > 80 + channel_references: + - $1: *vehicle_state_channel + - $2: *voltage_channel + type: review + + - name: speeding + description: Checks high vehicle speed + type: phase + expression: $1 > 20 + channel_references: + - $1: *velocity_channel + + - name: failures + description: Checks for failure logs + type: review + assignee: homer@example.com + expression: + name: log_substring_contains + channel_references: + - $1: *log_channel + sub_expressions: + - $2: ERROR + tags: + - foo + - bar + - baz + flows: - name: readings channels: @@ -134,8 +224,35 @@ def test_telemetry_config(): channels: - <<: *velocity_channel - <<: *voltage_channel - + - name: logs channels: - <<: *log_channel + +""" + +DUPLICATE_CHANNEL_IN_FLOW_TELEMETRY_CONFIG = """ +asset_name: LunarVehicle426 +ingestion_client_key: lunar_vehicle_426 + +channels: + velocity_channel: &velocity_channel + name: velocity + data_type: double + description: speed + unit: Miles Per Hour + component: mainmotor + +flows: + - name: readings + channels: + - <<: *velocity_channel + - <<: *velocity_channel +""" + +TEST_NAMED_EXPRESSIONS_YAML_STR = """ +log_substring_contains: + contains($1, $2) +is_even: + mod($1, 2) == 0 """ diff --git a/python/lib/sift_py/ingestion/error.py b/python/lib/sift_py/ingestion/error.py deleted file mode 100644 index 09383916..00000000 --- a/python/lib/sift_py/ingestion/error.py +++ /dev/null @@ -1,14 +0,0 @@ -""" -Errors specific to the `sift_py` ingestion module. -""" - - -class YamlConfigError(Exception): - """ - Used when the YAML config has missing or invalid properties. - """ - - message: str - - def __init__(self, message: str): - super().__init__(message) diff --git a/python/lib/sift_py/ingestion/flow.py b/python/lib/sift_py/ingestion/flow.py index dbefafd3..b4bd004f 100644 --- a/python/lib/sift_py/ingestion/flow.py +++ b/python/lib/sift_py/ingestion/flow.py @@ -1,11 +1,16 @@ from __future__ import annotations -from .channel import ChannelConfig -from sift_internal.convert.protobuf import try_cast_pb, AsProtobuf, ProtobufMessage + +from typing import Dict, List, Type + from sift.ingestion_configs.v1.ingestion_configs_pb2 import ( ChannelConfig as ChannelConfigPb, +) +from sift.ingestion_configs.v1.ingestion_configs_pb2 import ( FlowConfig as FlowConfigPb, ) -from typing import Dict, List, Optional, Type +from sift_internal.convert.protobuf import AsProtobuf +from sift_py.ingestion.channel import ChannelConfig, channel_fqn +from typing_extensions import Self class FlowConfig(AsProtobuf): @@ -24,35 +29,17 @@ class FlowConfig(AsProtobuf): def __init__(self, name: str, channels: List[ChannelConfig]): self.name = name self.channels = channels - self.channel_by_fqn = { - self.__class__.compute_fqn(c.name, c.component): i for i, c in enumerate(channels) - } - - def get_channel(self, name: str, component: Optional[str] = "") -> Optional[ChannelConfig]: - """ - Retrieves a `ChannelConfig` by its fully qualified name. Returns `None` if it cannot be found. - """ - fqn = self.__class__.compute_fqn(name, component) - index = self.channel_by_fqn[fqn] - - try: - return self.channels[index] - except IndexError: - return None - - def as_pb(self, klass: Type[ProtobufMessage]) -> Optional[ProtobufMessage]: - return FlowConfigPb( + self.channel_by_fqn = {channel_fqn(c): i for i, c in enumerate(channels)} + + def as_pb(self, klass: Type[FlowConfigPb]) -> FlowConfigPb: + return klass( name=self.name, - channels=[try_cast_pb(conf, ChannelConfigPb) for conf in self.channels], + channels=[conf.as_pb(ChannelConfigPb) for conf in self.channels], ) - @staticmethod - def compute_fqn(name: str, component: Optional[str]) -> str: - """ - The fully-qualified channel name of a channel called 'voltage' is simply `voltage'. The - fully qualified name of a channel called 'temperature' of component 'motor' is a `motor.temperature'. - """ - if component is None or len(component) == "": - return name - else: - return f"{component}.{name}" + @classmethod + def from_pb(cls, message: FlowConfigPb) -> Self: + return cls( + name=message.name, + channels=[ChannelConfig.from_pb(c) for c in message.channels], + ) diff --git a/python/lib/sift_py/ingestion/impl/__init__.py b/python/lib/sift_py/ingestion/impl/__init__.py index 8dfc9116..8d70f49e 100644 --- a/python/lib/sift_py/ingestion/impl/__init__.py +++ b/python/lib/sift_py/ingestion/impl/__init__.py @@ -1,6 +1,6 @@ """ INTERNAL MODULE -This module contains implementation details that isn't meant to be used directly. +This module contains implementation details that aren't meant to be used directly. APIs in this module are garaunteed to not be stable so proceed at your own risk. """ diff --git a/python/lib/sift_py/ingestion/impl/channel.py b/python/lib/sift_py/ingestion/impl/channel.py new file mode 100644 index 00000000..03b45d03 --- /dev/null +++ b/python/lib/sift_py/ingestion/impl/channel.py @@ -0,0 +1,38 @@ +from typing import List, cast + +from sift.channels.v2.channels_pb2 import Channel as ChannelPb +from sift.channels.v2.channels_pb2 import ListChannelsRequest, ListChannelsResponse +from sift.channels.v2.channels_pb2_grpc import ChannelServiceStub +from sift_py.grpc.transport import SiftChannel + + +def get_asset_channels( + transport_channel: SiftChannel, + asset_id: str, +) -> List[ChannelPb]: + """ + Queries all channels for the given `asset_id`. + """ + channels_pb = [] + + svc = ChannelServiceStub(transport_channel) + req = ListChannelsRequest( + filter=f'asset_id=="{asset_id}"', + page_size=1_000, + page_token="", + ) + res = cast(ListChannelsResponse, svc.ListChannels(req)) + channels_pb.extend(res.channels) + next_page_token = res.next_page_token + + while len(next_page_token) > 0: + req = ListChannelsRequest( + filter=f'asset_id=="{asset_id}"', + page_size=1_000, + page_token=next_page_token, + ) + res = cast(ListChannelsResponse, svc.ListChannels(req)) + channels_pb.extend(res.channels) + next_page_token = res.next_page_token + + return channels_pb diff --git a/python/lib/sift_py/ingestion/impl/error.py b/python/lib/sift_py/ingestion/impl/error.py new file mode 100644 index 00000000..2db520ca --- /dev/null +++ b/python/lib/sift_py/ingestion/impl/error.py @@ -0,0 +1,10 @@ +class IngestionValidationError(Exception): + """ + Errors that can occur while initializing the ingestion service + or when creating ingestion requests. + """ + + message: str + + def __init__(self, message: str): + super().__init__(message) diff --git a/python/lib/sift_py/ingestion/impl/ingest.py b/python/lib/sift_py/ingestion/impl/ingest.py index bdaff657..1a4316fe 100644 --- a/python/lib/sift_py/ingestion/impl/ingest.py +++ b/python/lib/sift_py/ingestion/impl/ingest.py @@ -1,35 +1,41 @@ from __future__ import annotations -from ..channel import ChannelValue, is_data_type, empty_value -from ..flow import FlowConfig -from .ingestion_config import ( - get_ingestion_config_by_client_key, - create_ingestion_config, -) -from ..config import TelemetryConfig -from ...grpc.transport import SiftChannel -from sift.ingestion_configs.v1.ingestion_configs_pb2 import IngestionConfig + +from datetime import datetime +from typing import Dict, List, Optional + +from google.protobuf.timestamp_pb2 import Timestamp from sift.ingest.v1.ingest_pb2 import ( IngestWithConfigDataChannelValue, IngestWithConfigDataStreamRequest, ) from sift.ingest.v1.ingest_pb2_grpc import IngestServiceStub -from sift.runs.v2.runs_pb2 import CreateRunRequest, CreateRunResponse -from sift.runs.v2.runs_pb2_grpc import RunServiceStub -from google.protobuf.timestamp_pb2 import Timestamp -from typing import cast, Dict, List, Optional -from datetime import datetime +from sift.ingestion_configs.v1.ingestion_configs_pb2 import IngestionConfig +from sift_py.grpc.transport import SiftChannel +from sift_py.ingestion.channel import ChannelValue, channel_fqn, empty_value, is_data_type +from sift_py.ingestion.config.telemetry import TelemetryConfig +from sift_py.ingestion.flow import FlowConfig +from sift_py.ingestion.impl.channel import get_asset_channels +from sift_py.ingestion.impl.error import IngestionValidationError +from sift_py.ingestion.impl.ingestion_config import ( + create_flow_configs, + create_ingestion_config, + get_ingestion_config_by_client_key, + get_ingestion_config_flow_names, +) +from sift_py.ingestion.impl.rule import get_asset_rules_json, update_rules +from sift_py.ingestion.impl.run import create_run, get_run_id_by_name +from sift_py.ingestion.rule.config import RuleConfig class IngestionServiceImpl: transport_channel: SiftChannel ingestion_config: IngestionConfig asset_name: str - - # TODO: Multiple flows can have the same name if their channel configs differ... flow_configs_by_name: Dict[str, FlowConfig] - + rules: List[RuleConfig] run_id: Optional[str] organization_id: Optional[str] + overwrite_rules: bool end_stream_on_error: bool def __init__( @@ -37,25 +43,44 @@ def __init__( channel: SiftChannel, config: TelemetryConfig, run_id: Optional[str] = None, + overwrite_rules: bool = False, end_stream_on_error: bool = False, ): - self.ingestion_config = self.__class__.__get_or_create_ingestion_config(channel, config) + ingestion_config = self.__class__.get_or_create_ingestion_config(channel, config) + + self.__class__.validate_and_update_channels( + channel, ingestion_config.asset_id, config.flows + ) + + self.__class__.update_flow_configs( + channel, ingestion_config.ingestion_config_id, config.flows + ) + + if not overwrite_rules: + self.__class__.validate_rules_synchronized( + channel, ingestion_config.asset_id, config.rules + ) + + if len(config.rules) > 0: + update_rules(channel, ingestion_config.asset_id, config.rules) + + self.rules = config.rules + self.ingestion_config = ingestion_config self.asset_name = config.asset_name self.transport_channel = channel self.run_id = run_id self.organization_id = config.organization_id self.end_stream_on_error = end_stream_on_error - - # TODO... flows can have the same name... self.flow_configs_by_name = {flow.name: flow for flow in config.flows} def ingest(self, *requests: IngestWithConfigDataStreamRequest): - # TODO: Add logic to re-establish connection if channel has been closed due to idle timeout - + """ + Perform data ingestion. + """ svc = IngestServiceStub(self.transport_channel) svc.IngestWithConfigDataStream(iter(requests)) - def start_run( + def attach_run( self, channel: SiftChannel, run_name: str, @@ -63,18 +88,28 @@ def start_run( organization_id: Optional[str] = None, tags: Optional[List[str]] = None, ): - svc = RunServiceStub(channel) - req = CreateRunRequest( - name=run_name, + """ + Retrieve an existing run or create one to use during this period of ingestion. + """ + run_id = get_run_id_by_name(channel, run_name) + + if run_id is not None: + self.run_id = run_id + return + + self.run_id = create_run( + channel=channel, + run_name=run_name, description=description or "", organization_id=organization_id or "", - tags=tags, + tags=tags or [], ) - res = cast(CreateRunResponse, svc.CreateRun(req)) - self.run_id = res.run.run_id - def end_run(self): - # TODO: Should hit the stop run endpoint + def detach_run(self): + """ + Detach run from this period of ingestion. Subsequent data ingested won't be associated with + the run being detached. + """ self.run_id = None def try_create_ingestion_request( @@ -83,27 +118,31 @@ def try_create_ingestion_request( timestamp: datetime, channel_values: List[ChannelValue], ) -> IngestWithConfigDataStreamRequest: + """ + Creates an ingestion request for a flow that must exist in `flow_configs_by_name`. This method + performs a series of client-side validations and will return a `IngestionValidationError` if any validations fail. + """ flow_config = self.flow_configs_by_name.get(flow_name) if flow_config is None: - raise ValueError(f"A flow config of name '{flow_name}' could not be found.") + raise IngestionValidationError( + f"A flow config of name '{flow_name}' could not be found." + ) channel_values_by_fqn: Dict[str, ChannelValue] = {} for channel_value in channel_values: - name = channel_value["channel_name"] - component = channel_value.get("component") - fqn = FlowConfig.compute_fqn(name, component) + fqn = channel_fqn(channel_value) if channel_values_by_fqn.get(fqn, None) is None: channel_values_by_fqn[fqn] = channel_value else: - raise ValueError(f"Encountered multiple values for {fqn}") + raise IngestionValidationError(f"Encountered multiple values for {fqn}") values: List[IngestWithConfigDataChannelValue] = [] for channel in flow_config.channels: - fqn = FlowConfig.compute_fqn(channel.name, channel.component) + fqn = channel_fqn(channel) channel_value = channel_values_by_fqn.pop(fqn, None) if channel_value is None: @@ -115,18 +154,18 @@ def try_create_ingestion_request( if is_data_type(value, channel.data_type): values.append(value) else: - raise ValueError( + raise IngestionValidationError( f"Expected value for `{channel.name}` to be a '{channel.data_type}'." ) if len(channel_values_by_fqn) > 0: unexpected_channels = [name for name in channel_values_by_fqn.keys()] - raise ValueError( - f"Unexpected channels for flow '{flow_name}' or 'component' field missing for channel: {unexpected_channels}" + raise IngestionValidationError( + f"Unexpected channel(s) for flow '{flow_name}': {unexpected_channels}" ) if timestamp.tzname() != "UTC": - raise ValueError( + raise IngestionValidationError( f"Expected 'timestamp' to be in UTC but it is in {timestamp.tzname()}." ) @@ -141,6 +180,10 @@ def create_ingestion_request( timestamp: datetime, channel_values: List[IngestWithConfigDataChannelValue], ) -> IngestWithConfigDataStreamRequest: + """ + Creates an ingestion request for a flow that must exist in `flow_configs_by_name`. This method + does not do any sort of client-side validation and is recommended to use if performance is required. + """ timestamp_pb = Timestamp() timestamp_pb.FromDatetime(timestamp) @@ -155,8 +198,68 @@ def create_ingestion_request( ) @staticmethod - def __get_or_create_ingestion_config(channel: SiftChannel, config: TelemetryConfig): - # TODO: Handle case where new Flows are added to an existing ingestion config + def update_flow_configs( + channel: SiftChannel, ingestion_config_id: str, flows: List[FlowConfig] + ): + """ + Queries flow configs from Sift and does a check to see if there are any new flow configs that need to be created. + """ + if len(flows) == 0: + return + + registered_flow_names = set(get_ingestion_config_flow_names(channel, ingestion_config_id)) + + flows_to_create = [] + + for flow in flows: + if flow.name in registered_flow_names: + continue + + flows_to_create.append(flow) + + if len(flows_to_create) > 0: + create_flow_configs(channel, ingestion_config_id, flows_to_create) + + @staticmethod + def validate_and_update_channels( + transport_channel: SiftChannel, asset_id: str, flows: List[FlowConfig] + ): + """ + There isn't an update channels API yet. + + For now this function will just ensure that people aren't changing channel data-types for + an existing channel given by component and name. + + Changing the name/component of a channel will just result in a new channel. + + We'll want to add support to update description, unit, and whatever else. + """ + if len(flows) == 0: + return + + existing_channels = get_asset_channels(transport_channel, asset_id) + channel_by_fqn = {channel_fqn(c): c for c in existing_channels} + + for flow in flows: + for channel in flow.channels: + already_created_channel = channel_by_fqn.get(channel.fqn()) + + if already_created_channel is None: + continue + + if already_created_channel.data_type != channel.data_type: + raise IngestionValidationError( + f"Cannot change the data-type of channel '{channel.fqn()}'." + ) + + @staticmethod + def get_or_create_ingestion_config( + channel: SiftChannel, config: TelemetryConfig + ) -> IngestionConfig: + """ + Retrieves an existing ingestion config or creates a new one. + """ + ingestion_config = get_ingestion_config_by_client_key(channel, config.ingestion_client_key) if ingestion_config is not None: @@ -169,3 +272,35 @@ def __get_or_create_ingestion_config(channel: SiftChannel, config: TelemetryConf config.ingestion_client_key, config.organization_id, ) + + @staticmethod + def validate_rules_synchronized( + transport_channel: SiftChannel, + asset_id: str, + rule_configs: List[RuleConfig], + ): + """ + Ensures that rules defined in the telemetry config and the rules in Sift are in sync, otherwise error. + Namely, if a rule was added via a Sift UI and wasn't added immediately to the telemetry config, then + this will raise an exception. + """ + if len(rule_configs) == 0: + return + + rules_json = get_asset_rules_json(transport_channel, asset_id) + + rule_names_from_config = set() + + for rule_config in rule_configs: + rule_names_from_config.add(rule_config.name) + + for rule_json in rules_json: + rule_name: str = rule_json.get("name", "") + + if len(rule_name) == 0: + raise IngestionValidationError("Encountered rule without a name from Sift API.") + + if rule_name not in rule_names_from_config: + raise IngestionValidationError( + f"Encountered rule '{rule_name}' on asset '{asset_id}' not found in local telemetry config. Add it." + ) diff --git a/python/lib/sift_py/ingestion/impl/ingest_test.py b/python/lib/sift_py/ingestion/impl/ingest_test.py new file mode 100644 index 00000000..7865ed6c --- /dev/null +++ b/python/lib/sift_py/ingestion/impl/ingest_test.py @@ -0,0 +1,474 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Callable + +import pytest +import sift.common.type.v1.channel_data_type_pb2 as channel_pb +from pytest_mock import MockFixture +from sift.channels.v2.channels_pb2 import Channel as ChannelPb +from sift.ingestion_configs.v1.ingestion_configs_pb2 import IngestionConfig as IngestionConfigPb +from sift_internal.test_util.channel import MockChannel +from sift_py.ingestion.channel import ( + ChannelConfig, + ChannelDataType, + double_value, + int32_value, + string_value, +) +from sift_py.ingestion.config.telemetry import TelemetryConfig +from sift_py.ingestion.flow import FlowConfig +from sift_py.ingestion.impl.channel import get_asset_channels +from sift_py.ingestion.impl.error import IngestionValidationError +from sift_py.ingestion.impl.ingest import ( + IngestionServiceImpl, + create_flow_configs, + get_ingestion_config_flow_names, +) +from sift_py.ingestion.impl.ingestion_config import ( + create_ingestion_config, + get_ingestion_config_by_client_key, +) +from sift_py.ingestion.impl.rule import get_asset_rules_json, update_rules +from sift_py.ingestion.rule.config import RuleActionCreateDataReviewAnnotation, RuleConfig + +SUBJECT_MODULE = "sift_py.ingestion.impl.ingest" + + +def _mock_path(fn: Callable) -> str: + return f"{SUBJECT_MODULE}.{fn.__name__}" + + +def test_ingestion_service_update_flow_configs_updates_flows(mocker: MockFixture): + """ + Tests to ensure that if a user adds a new flow to the telemetry config, + `update_flow_configs` will ensure that it gets created. + """ + ingestion_config_id = "ingestion-config-id" + + flow_a = FlowConfig( + name="flow_a", + channels=[ + ChannelConfig( + name="channel_a", + data_type=ChannelDataType.DOUBLE, + ), + ], + ) + + flow_b = FlowConfig( + name="flow_b", + channels=[ + ChannelConfig( + name="channel_b", + data_type=ChannelDataType.DOUBLE, + ), + ], + ) + + flows_loaded_from_config = [flow_a, flow_b] + + flow_names_queried_from_api = ["flow_a"] + + mock_validate_and_update_channels = mocker.patch.object( + IngestionServiceImpl, "validate_and_update_channels" + ) + mock_validate_and_update_channels.return_value = None + + mock_get_ingestion_config_flow_names = mocker.patch(_mock_path(get_ingestion_config_flow_names)) + mock_get_ingestion_config_flow_names.return_value = flow_names_queried_from_api + + mock_create_flow_configs = mocker.patch(_mock_path(create_flow_configs)) + mock_create_flow_configs.return_value = None + + mock_channel = MockChannel() + IngestionServiceImpl.update_flow_configs( + mock_channel, ingestion_config_id, flows_loaded_from_config + ) + mock_create_flow_configs.assert_called_once_with(mock_channel, ingestion_config_id, [flow_b]) + + +def test_ingestion_service_get_or_create_ingestion_config_retrieves_existing(mocker: MockFixture): + """ + Ensure that if an ingestion config is queried by client key, a new one is not created. + """ + + mock_ingestion_config = IngestionConfigPb( + ingestion_config_id="ingestion-config-id", + asset_id="asset-id", + client_key="client-key", + ) + + mock_telemetry_config = TelemetryConfig( + asset_name="asset_name", + ingestion_client_key=mock_ingestion_config.client_key, + ) + + mock_validate_and_update_channels = mocker.patch.object( + IngestionServiceImpl, "validate_and_update_channels" + ) + mock_validate_and_update_channels.return_value = None + + mock_get_ingestion_config_by_client_key = mocker.patch( + _mock_path(get_ingestion_config_by_client_key) + ) + mock_get_ingestion_config_by_client_key.return_value = mock_ingestion_config + + mock_create_ingestion_config = mocker.patch(_mock_path(create_ingestion_config)) + + mock_channel = MockChannel() + + ingestion_config = IngestionServiceImpl.get_or_create_ingestion_config( + mock_channel, + mock_telemetry_config, + ) + + mock_create_ingestion_config.assert_not_called() + + assert ingestion_config.ingestion_config_id == mock_ingestion_config.ingestion_config_id + + +def test_ingestion_service_get_or_create_ingestion_config_create_if_not_exist(mocker: MockFixture): + """ + Ensure that if an ingestion config does not exist for a given client key then a new + ingestion config is created based on the telemetry config. + """ + + mock_ingestion_config = IngestionConfigPb( + ingestion_config_id="ingestion-config-id", + asset_id="asset-id", + client_key="client-key", + ) + + mock_telemetry_config = TelemetryConfig( + asset_name="asset_name", + ingestion_client_key=mock_ingestion_config.client_key, + organization_id="my-org-id", + ) + + mock_get_ingestion_config_by_client_key = mocker.patch( + _mock_path(get_ingestion_config_by_client_key) + ) + mock_get_ingestion_config_by_client_key.return_value = None + + mock_create_ingestion_config = mocker.patch(_mock_path(create_ingestion_config)) + mock_create_ingestion_config.return_value = mock_ingestion_config + + mock_channel = MockChannel() + + ingestion_config = IngestionServiceImpl.get_or_create_ingestion_config( + mock_channel, + mock_telemetry_config, + ) + + mock_create_ingestion_config.assert_called_once_with( + mock_channel, + mock_telemetry_config.asset_name, + [], + mock_ingestion_config.client_key, + mock_telemetry_config.organization_id, + ) + + assert ingestion_config.ingestion_config_id == mock_ingestion_config.ingestion_config_id + + +def test_ingestion_service_try_create_ingestion_request_validations(mocker: MockFixture): + """ + Tests all the different validations that happen when trying to create an ingestion request. + """ + + voltage_channel = ChannelConfig( + name="voltage", + component="motor", + data_type=ChannelDataType.DOUBLE, + ) + pressure_channel = ChannelConfig( + name="pressure", + data_type=ChannelDataType.INT_64, + ) + logs_channel = ChannelConfig( + name="logs", + data_type=ChannelDataType.STRING, + ) + + telemetry_config = TelemetryConfig( + asset_name="my-asset", + ingestion_client_key="my-client-key", + flows=[ + FlowConfig( + name="reading", + channels=[voltage_channel, pressure_channel], + ), + FlowConfig( + name="pressure", + channels=[pressure_channel], + ), + FlowConfig( + name="log", + channels=[logs_channel], + ), + ], + ) + + mock_ingestion_config = IngestionConfigPb( + ingestion_config_id="ingestion-config-id", + asset_id="my-asset-id", + client_key="my-client-key", + ) + + mock_get_or_create_ingestion_config = mocker.patch.object( + IngestionServiceImpl, "get_or_create_ingestion_config" + ) + mock_get_or_create_ingestion_config.return_value = mock_ingestion_config + + mock_validate_and_update_channels = mocker.patch.object( + IngestionServiceImpl, "validate_and_update_channels" + ) + mock_validate_and_update_channels.return_value = None + + mock_update_flow_configs = mocker.patch.object(IngestionServiceImpl, "update_flow_configs") + mock_update_flow_configs.return_value = None + + mock_update_rules = mocker.patch(_mock_path(update_rules)) + mock_update_rules.return_value = None + + transport_channel = MockChannel() + + svc = IngestionServiceImpl( + channel=transport_channel, + config=telemetry_config, + overwrite_rules=True, + ) + + # Non-existent flow + with pytest.raises(IngestionValidationError, match="could not be found"): + svc.try_create_ingestion_request( + flow_name="lerg", # typo + timestamp=datetime.now(timezone.utc), + channel_values=[ + {"channel_name": "logs", "value": string_value("foobar")}, + ], + ) + + # Duplicate values for channel + with pytest.raises(IngestionValidationError, match="multiple values"): + svc.try_create_ingestion_request( + flow_name="log", + timestamp=datetime.now(timezone.utc), + channel_values=[ + {"channel_name": "logs", "value": string_value("foobar")}, + {"channel_name": "logs", "value": string_value("foobar")}, + ], + ) + + # Wrong channel value type + with pytest.raises(IngestionValidationError, match="Expected value"): + svc.try_create_ingestion_request( + flow_name="log", + timestamp=datetime.now(timezone.utc), + channel_values=[ + {"channel_name": "logs", "value": int32_value(32)}, + ], + ) + + # Wrong channel for flow + with pytest.raises(IngestionValidationError, match="Unexpected channel"): + svc.try_create_ingestion_request( + flow_name="log", + timestamp=datetime.now(timezone.utc), + channel_values=[ + {"channel_name": "voltage", "value": double_value(32)}, + ], + ) + + +def test_ingestion_service_init_ensures_rules_synchonized(mocker: MockFixture): + """ + Ensures that rules in Sift match rules in config, otherwise an exception is + raised asking user to update their local config. Also test `overwrite_rules` + which will ignore the difference and replace all rules in Sift with what's + in the config + """ + voltage_channel = ChannelConfig( + name="voltage", + component="motor", + data_type=ChannelDataType.DOUBLE, + ) + pressure_channel = ChannelConfig( + name="pressure", + data_type=ChannelDataType.INT_64, + ) + logs_channel = ChannelConfig( + name="logs", + data_type=ChannelDataType.STRING, + ) + + rule_on_voltage = RuleConfig( + name="voltage_rule", + description="", + expression="$1 > 10", + channel_references={ + "$1": voltage_channel, + }, + action=RuleActionCreateDataReviewAnnotation( + assignee="bob@example.com", + tags=["motor"], + ), + ) + + rule_on_pressure = RuleConfig( + name="pressure_rule", + description="", + expression="$1 > 10", + channel_references={ + "$1": pressure_channel, + }, + action=RuleActionCreateDataReviewAnnotation( + assignee="bob@example.com", + tags=["barometer"], + ), + ) + + # This rule won't be in the config. It will be "returned" by the API. + rule_on_logs = RuleConfig( + name="log_rule", + description="", + expression='contains($1, "ERROR")', + channel_references={ + "$1": logs_channel, + }, + action=RuleActionCreateDataReviewAnnotation( + assignee="bob@example.com", + tags=["log"], + ), + ) + + mock_ingestion_config = IngestionConfigPb( + ingestion_config_id="my-ingestion-config", + asset_id="my-asset-id", + client_key="my-client-key", + ) + + mock_get_or_create_ingestion_config = mocker.patch.object( + IngestionServiceImpl, "get_or_create_ingestion_config" + ) + mock_get_or_create_ingestion_config.return_value = mock_ingestion_config + + mock_validate_and_update_channels = mocker.patch.object( + IngestionServiceImpl, "validate_and_update_channels" + ) + mock_validate_and_update_channels.return_value = None + + mock_update_flow_configs = mocker.patch.object(IngestionServiceImpl, "update_flow_configs") + mock_update_flow_configs.return_value = None + + mock_get_asset_rules_json = mocker.patch(_mock_path(get_asset_rules_json)) + + mock_get_asset_rules_json.return_value = [ + rule_on_logs.as_json(), + rule_on_pressure.as_json(), + rule_on_voltage.as_json(), + ] + + telemetry_config = TelemetryConfig( + asset_name="my-asset", + ingestion_client_key=mock_ingestion_config.client_key, + flows=[ + FlowConfig( + name="reading", + channels=[voltage_channel, pressure_channel], + ), + FlowConfig( + name="pressure", + channels=[pressure_channel], + ), + FlowConfig( + name="log", + channels=[logs_channel], + ), + ], + rules=[rule_on_voltage, rule_on_pressure], + ) + + mock_channel = MockChannel() + + with pytest.raises(Exception, match="not found in local"): + _ = IngestionServiceImpl( + channel=mock_channel, + config=telemetry_config, + ) + + # Now we make sure that we can overwrite rules + mock_update_rules = mocker.patch(_mock_path(update_rules)) + mock_update_rules.return_value = None + + _ = IngestionServiceImpl( + channel=mock_channel, + config=telemetry_config, + overwrite_rules=True, + ) + + mock_update_rules.assert_called_once_with( + mock_channel, + mock_ingestion_config.asset_id, + telemetry_config.rules, + ) + + +def test_ingestion_service_validate_channels(mocker: MockFixture): + """ + Disallow updating channel-data type for an existing channel + """ + + # This is a previously created channel that already exists in Sift + existing_voltage_channel = ChannelPb( + name="voltage", + component="motor", + data_type=channel_pb.CHANNEL_DATA_TYPE_DOUBLE, + ) + + # For the same channel someone tried changing the channel data-type from the config + voltage_channel = ChannelConfig( + name="voltage", + component="motor", + data_type=ChannelDataType.INT_32, + ) + + mock_ingestion_config = IngestionConfigPb( + ingestion_config_id="my-ingestion-config", + asset_id="my-asset-id", + client_key="my-client-key", + ) + + mock_get_or_create_ingestion_config = mocker.patch.object( + IngestionServiceImpl, "get_or_create_ingestion_config" + ) + mock_get_or_create_ingestion_config.return_value = mock_ingestion_config + + mock_get_asset_channels = mocker.patch(_mock_path(get_asset_channels)) + mock_get_asset_channels.return_value = [existing_voltage_channel] + + mock_update_flow_configs = mocker.patch.object(IngestionServiceImpl, "update_flow_configs") + mock_update_flow_configs.return_value = None + + mock_get_asset_rules_json = mocker.patch(_mock_path(get_asset_rules_json)) + mock_get_asset_rules_json.return_value = None + + telemetry_config = TelemetryConfig( + asset_name="my-asset", + ingestion_client_key=mock_ingestion_config.client_key, + flows=[ + FlowConfig( + name="reading", + channels=[voltage_channel], + ), + ], + ) + + mock_channel = MockChannel() + + with pytest.raises(IngestionValidationError, match="Cannot change the data-type"): + _ = IngestionServiceImpl( + channel=mock_channel, + config=telemetry_config, + ) diff --git a/python/lib/sift_py/ingestion/impl/ingestion_config.py b/python/lib/sift_py/ingestion/impl/ingestion_config.py index 8f889911..805c0ef2 100644 --- a/python/lib/sift_py/ingestion/impl/ingestion_config.py +++ b/python/lib/sift_py/ingestion/impl/ingestion_config.py @@ -1,23 +1,24 @@ -""" -Internal module: This module contains implementation details that are not meant to be -used by consumers of this library and are not garaunteed to be stable. -""" - -from ...grpc.transport import SiftChannel -from ..flow import FlowConfig -from sift_internal.convert.protobuf import try_cast_pb +from typing import List, Optional, cast + from sift.ingestion_configs.v1.ingestion_configs_pb2 import ( - IngestionConfig, + CreateIngestionConfigFlowsRequest, + CreateIngestionConfigFlowsResponse, CreateIngestionConfigRequest, CreateIngestionConfigResponse, + IngestionConfig, + ListIngestionConfigFlowsRequest, + ListIngestionConfigFlowsResponse, ListIngestionConfigsRequest, ListIngestionConfigsResponse, +) +from sift.ingestion_configs.v1.ingestion_configs_pb2 import ( FlowConfig as FlowConfigPb, ) from sift.ingestion_configs.v1.ingestion_configs_pb2_grpc import ( IngestionConfigServiceStub, ) -from typing import cast, List, Optional +from sift_py.grpc.transport import SiftChannel +from sift_py.ingestion.flow import FlowConfig def get_ingestion_config_by_client_key( @@ -58,7 +59,64 @@ def create_ingestion_config( asset_name=asset_name, client_key=client_key, organization_id=organization_id or "", - flows=[try_cast_pb(flow, FlowConfigPb) for flow in flows], + flows=[flow.as_pb(FlowConfigPb) for flow in flows], ) res = cast(CreateIngestionConfigResponse, svc.CreateIngestionConfig(req)) return res.ingestion_config + + +def get_ingestion_config_flow_names( + channel: SiftChannel, + ingestion_config_id: str, +) -> List[str]: + """ + Gets all names of flow configs of an ingestion config. + """ + + svc = IngestionConfigServiceStub(channel) + + flows: List[str] = [] + + req = ListIngestionConfigFlowsRequest( + ingestion_config_id=ingestion_config_id, + page_size=1_000, + filter="", + ) + res = cast(ListIngestionConfigFlowsResponse, svc.ListIngestionConfigFlows(req)) + + for flow in res.flows: + flows.append(flow.name) + + page_token = res.next_page_token + + while len(page_token) > 0: + req = ListIngestionConfigFlowsRequest( + ingestion_config_id=ingestion_config_id, + page_size=1_000, + filter="", + page_token=page_token, + ) + res = cast(ListIngestionConfigFlowsResponse, svc.ListIngestionConfigFlows(req)) + + for flow in res.flows: + flows.append(flow.name) + + page_token = res.next_page_token + + return flows + + +def create_flow_configs( + channel: SiftChannel, + ingestion_config_id: str, + flow_configs: List[FlowConfig], +): + """ + Adds flow configs to an existing ingestion config. + """ + svc = IngestionConfigServiceStub(channel) + req = CreateIngestionConfigFlowsRequest( + ingestion_config_id=ingestion_config_id, + flows=[f.as_pb(FlowConfigPb) for f in flow_configs], + ) + _ = cast(CreateIngestionConfigFlowsResponse, svc.CreateIngestionConfigFlows(req)) diff --git a/python/lib/sift_py/ingestion/impl/rule.py b/python/lib/sift_py/ingestion/impl/rule.py new file mode 100644 index 00000000..926091a6 --- /dev/null +++ b/python/lib/sift_py/ingestion/impl/rule.py @@ -0,0 +1,49 @@ +import json +from typing import Any, Dict, List, Optional, cast + +from sift.rules.v1.rules_pb2 import ( + JsonRulesRequest, + UpdateJsonRulesRequest, + UpdateJsonRulesResponse, + ViewJsonRulesRequest, + ViewJsonRulesResponse, +) +from sift.rules.v1.rules_pb2_grpc import RuleServiceStub +from sift_internal.convert.json import to_json +from sift_py.grpc.transport import SiftChannel +from sift_py.ingestion.rule.config import RuleConfig + + +def get_asset_rules_json( + transport_channel: SiftChannel, + asset_id: str, +) -> List[Dict[str, Any]]: + svc = RuleServiceStub(transport_channel) + req = ViewJsonRulesRequest(asset_id=asset_id) + res = cast(ViewJsonRulesResponse, svc.ViewJsonRules(req)) + rules_json: List[Dict[str, Any]] = cast(list, json.loads(res.rules_json)) + return rules_json + + +def update_rules( + transport_channel: SiftChannel, + asset_id: str, + rule_configs: List[RuleConfig], + organization_id: Optional[str] = None, +): + """ + Updates a set of rules. Raises an exception if failure. + """ + svc = RuleServiceStub(transport_channel) + json_rules = to_json(rule_configs) + req = UpdateJsonRulesRequest( + request=JsonRulesRequest( + asset_id=asset_id, + rules_json=json_rules, + organization_id=organization_id or "", + ) + ) + res = cast(UpdateJsonRulesResponse, svc.UpdateJsonRules(req)) + + if not res.response.success: + raise Exception(f"Failed to load rules: {res.response.error_messages}") diff --git a/python/lib/sift_py/ingestion/impl/run.py b/python/lib/sift_py/ingestion/impl/run.py new file mode 100644 index 00000000..2c128580 --- /dev/null +++ b/python/lib/sift_py/ingestion/impl/run.py @@ -0,0 +1,45 @@ +from typing import List, Optional, cast + +from sift.runs.v2.runs_pb2 import ( + CreateRunRequest, + CreateRunResponse, + ListRunsRequest, + ListRunsResponse, +) +from sift.runs.v2.runs_pb2_grpc import RunServiceStub +from sift_py.grpc.transport import SiftChannel + + +def get_run_id_by_name( + channel: SiftChannel, + run_name: str, +) -> Optional[str]: + svc = RunServiceStub(channel) + req = ListRunsRequest( + filter=f'name=="{run_name}"', + page_size=1, + ) + res = cast(ListRunsResponse, svc.ListRuns(req)) + + if len(res.runs) == 0: + return None + + return res.runs[0].run_id + + +def create_run( + channel: SiftChannel, + run_name: str, + description: str, + organization_id: str, + tags: List[str], +) -> str: + svc = RunServiceStub(channel) + req = CreateRunRequest( + name=run_name, + description=description, + organization_id=organization_id, + tags=tags, + ) + res = cast(CreateRunResponse, svc.CreateRun(req)) + return res.run.run_id diff --git a/python/lib/sift_py/ingestion/rule/__init__.py b/python/lib/sift_py/ingestion/rule/__init__.py new file mode 100644 index 00000000..9b03705c --- /dev/null +++ b/python/lib/sift_py/ingestion/rule/__init__.py @@ -0,0 +1,3 @@ +""" +Concerned with loading in rules to use during ingestion. +""" diff --git a/python/lib/sift_py/ingestion/rule/config.py b/python/lib/sift_py/ingestion/rule/config.py new file mode 100644 index 00000000..3bf89b69 --- /dev/null +++ b/python/lib/sift_py/ingestion/rule/config.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from enum import Enum +from typing import Any, Dict, List, Optional, TypedDict + +from sift.annotations.v1.annotations_pb2 import AnnotationType +from sift.rules.v1.rules_pb2 import ActionKind +from sift_internal.convert.json import AsJson +from sift_py.ingestion.channel import ChannelConfig, channel_fqn + + +class RuleConfig(AsJson): + """ + Defines a rule to be used during ingestion. If a rule's expression validates to try, then + a specific action will take place as specified by the `kind` attribute. + + Attributes: + `name`: + Name of the rule. + `description`: + Description of the rule. + `expression`: + A CEL string expression, that, when evaluated to a truthy value, executes the `action`. + `action`: + The action to execute if the result of an `expression` evaluates to a truthy value. + `channel_references`: + Reference to channel dict. If an expression is "$1 < 10", then "$1" is the reference and thus should the key in the dict. + """ + + name: str + description: str + expression: str + action: RuleAction + channel_references: Dict[str, ChannelConfig] + + def __init__( + self, + name: str, + description: str, + expression: str, + action: RuleAction, + channel_references: Dict[str, ChannelConfig] = {}, + sub_expressions: Dict[str, Any] = {}, + ): + self.name = name + self.description = description + self.action = action + self.channel_references = channel_references + self.expression = self.__class__.interpolate_sub_expressions(expression, sub_expressions) + + def as_json(self) -> Any: + """ + Produces the appropriate JSON structure that's suitable for the Rules API. + """ + + hash_map: Dict[str, List[ExpressionChannelReference] | str | List[str] | None] = { + "name": self.name, + "description": self.description, + "expression": self.expression, + } + + channel_references: List[ExpressionChannelReference] = [] + for ref, channel_config in self.channel_references.items(): + channel_references.append( + { + "channel_reference": ref, + "channel_identifier": channel_fqn(channel_config), + } + ) + + hash_map["expression_channel_references"] = channel_references + + if isinstance(self.action, RuleActionCreateDataReviewAnnotation): + hash_map["type"] = RuleActionAnnotationKind.REVIEW.value + hash_map["assignee"] = self.action.assignee + + if self.action.assignee is not None and len(self.action.assignee) > 0: + hash_map["assignee"] = self.action.assignee + + if self.action.tags is not None and len(self.action.tags) > 0: + hash_map["tags"] = self.action.tags + + elif isinstance(self.action, RuleActionCreatePhaseAnnotation): + hash_map["type"] = RuleActionAnnotationKind.PHASE.value + + if self.action.tags is not None and len(self.action.tags) > 0: + hash_map["tags"] = self.action.tags + else: + raise TypeError(f"Unsupported rule action '{self.action.kind()}'.") + + return hash_map + + @staticmethod + def interpolate_sub_expressions(expression: str, sub_expressions: Dict[str, str]) -> str: + for ref, expr in sub_expressions.items(): + if ref not in expression: + raise ValueError(f"Couldn't find '{ref}' in expression '{expression}'.") + if isinstance(expr, str): + expression = expression.replace(ref, f'"{expr}"') + else: + expression = expression.replace(ref, str(expr)) + + return expression + + +class RuleAction(ABC): + @abstractmethod + def kind(self) -> RuleActionKind: + pass + + +class RuleActionCreateDataReviewAnnotation(RuleAction): + """ + Action to create a data-review annotation when a rule evaluates to a truthy value. + + Attributes: + `tags`: + List of tag names to associate with the newly created data-review annotation. + `assignee`: + Email of user in organization to assign the newly created data-review annotation. + """ + + tags: Optional[List[str]] + assignee: Optional[str] + + def __init__(self, assignee: Optional[str] = None, tags: Optional[List[str]] = None): + self.assignee = assignee + self.tags = tags + + def kind(self) -> RuleActionKind: + return RuleActionKind.ANNOTATION + + +class RuleActionCreatePhaseAnnotation(RuleAction): + """ + Action to create a phase annotation when a rule evaluates to a truthy value. + + Attributes: + `tags`: + List of tag names to associate with the newly created data-review annotation. + """ + + tags: Optional[List[str]] + + def __init__(self, tags: Optional[List[str]] = None): + self.tags = tags + + def kind(self) -> RuleActionKind: + return RuleActionKind.ANNOTATION + + +class RuleActionAnnotationKind(Enum): + REVIEW = "review" + PHASE = "phase" + + @classmethod + def from_annotation_type(cls, annotation_type: AnnotationType) -> "RuleActionAnnotationKind": + if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: + return cls.PHASE + return cls.PHASE + + @classmethod + def from_str(cls, val: str) -> "RuleActionAnnotationKind": + if val == cls.REVIEW.value: + return cls.REVIEW + elif val == cls.PHASE.value: + return cls.PHASE + else: + raise ValueError("Argument 'val' is not a valid annotation kind.") + + +class RuleActionKind(Enum): + NOTIFICATION = ActionKind.NOTIFICATION + ANNOTATION = ActionKind.ANNOTATION + + @classmethod + def from_str(cls, val: str) -> Optional["RuleActionKind"]: + if val == "ACTION_KIND_NOTIFICATION" or val == RuleActionKindStrRep.NOTIFICATION.value: + return cls.NOTIFICATION + elif val == "ACTION_KIND_ANNOTATION" or val == RuleActionKindStrRep.ANNOTATION.value: + return cls.ANNOTATION + + return None + + +class RuleActionKindStrRep(Enum): + NOTIFICATION = "notification" + ANNOTATION = "annotation" + + +class ExpressionChannelReference(TypedDict): + """ + `reference`: The channel reference (e.g. '$1') used in the expression. + `identifier`: The fully qualified channel name. See `sift_py.ingestion.channel.channel_fqn`. + """ + + channel_reference: str + channel_identifier: str diff --git a/python/lib/sift_py/ingestion/rule/config_test.py b/python/lib/sift_py/ingestion/rule/config_test.py new file mode 100644 index 00000000..dd2eda36 --- /dev/null +++ b/python/lib/sift_py/ingestion/rule/config_test.py @@ -0,0 +1,72 @@ +from sift_py.ingestion.channel import ChannelConfig, ChannelDataType + +from .config import ( + RuleActionCreateDataReviewAnnotation, + RuleActionCreatePhaseAnnotation, + RuleConfig, +) + + +def test_rule_config_json(): + voltage_rule_expression = "$1 > 10" + voltage_rule_config = RuleConfig( + name="High Voltage", + description="Rock & Roll", + expression=voltage_rule_expression, + action=RuleActionCreatePhaseAnnotation(), + channel_references={ + "$1": ChannelConfig( + name="voltage", + data_type=ChannelDataType.DOUBLE, + ), + }, + ) + assert voltage_rule_config.expression == voltage_rule_expression + + overheating_rule_expression = '$1 == "Accelerating" && $2 > $3' + overheating_rule_config = RuleConfig( + name="overheating", + description="checks if vehicle overheats while accelerating", + expression=overheating_rule_expression, + action=RuleActionCreateDataReviewAnnotation( + tags=["foo", "bar"], + assignee="foobar@baz.com", + ), + channel_references={ + "$1": ChannelConfig( + name="vehicle_state", + component="motor", + data_type=ChannelDataType.INT_32, + ), + "$2": ChannelConfig( + name="temperature", + component="motor", + data_type=ChannelDataType.INT_32, + ), + }, + sub_expressions={ + "$3": 80, + }, + ) + assert overheating_rule_config.expression == '$1 == "Accelerating" && $2 > 80' + + contains_rule_expression = "contains($1, $2)" + contains_rule_config = RuleConfig( + name="contains", + description="checks if vehicle overheats while accelerating", + expression=contains_rule_expression, + action=RuleActionCreateDataReviewAnnotation( + tags=["foo", "bar"], + assignee="foobar@baz.com", + ), + channel_references={ + "$1": ChannelConfig( + name="log", + data_type=ChannelDataType.INT_32, + ), + }, + sub_expressions={ + "$2": "Error", + }, + ) + assert contains_rule_config.expression == 'contains($1, "Error")' diff --git a/python/lib/sift_py/ingestion/service.py b/python/lib/sift_py/ingestion/service.py index c49c566d..dcadd8f4 100644 --- a/python/lib/sift_py/ingestion/service.py +++ b/python/lib/sift_py/ingestion/service.py @@ -1,21 +1,47 @@ from __future__ import annotations -from ..grpc.transport import SiftChannel -from .config import TelemetryConfig -from ..ingestion.flow import FlowConfig -from .channel import ChannelValue + +from datetime import datetime +from typing import Dict, List, Optional + from sift.ingest.v1.ingest_pb2 import ( IngestWithConfigDataChannelValue, IngestWithConfigDataStreamRequest, ) from sift.ingestion_configs.v1.ingestion_configs_pb2 import IngestionConfig -from typing import Dict, List, Optional -from .impl.ingest import IngestionServiceImpl -from datetime import datetime +from sift_py.grpc.transport import SiftChannel +from sift_py.ingestion.channel import ChannelValue +from sift_py.ingestion.config.telemetry import TelemetryConfig +from sift_py.ingestion.flow import FlowConfig +from sift_py.ingestion.impl.ingest import IngestionServiceImpl class IngestionService(IngestionServiceImpl): """ A fully configured service that, when instantiated, is ready to start ingesting data. + + Attributes: + `transport_channel`: + A gRPC transport channel. Prefer to use `SiftChannel`. + `ingestion_config`: + The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this. + `asset_name`: + The name of the asset to telemeter. + `flow_configs_by_name`: + A mapping of flow config name to the actual flow config. + `run_id`: + The ID of the optional run to associated ingested data with. + `organization_id`: + ID of the organization of the user. + `overwrite_rules`: + If there are rules in Sift that aren't found in the local telemetry config, then initializing + an `IngestionService` will raise an exception advising the user to update their telemetry config + with the missing rule before proceeding. Setting this field to `True` replace all rules currently + in Sift with the rules in the telemetry config. + `end_stream_on_error`: + By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion + won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True` + will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This + is useful for debugging purposes. """ transport_channel: SiftChannel @@ -24,6 +50,7 @@ class IngestionService(IngestionServiceImpl): flow_configs_by_name: Dict[str, FlowConfig] run_id: Optional[str] organization_id: Optional[str] + overwrite_rules: bool end_stream_on_error: bool def __init__( @@ -31,9 +58,10 @@ def __init__( channel: SiftChannel, config: TelemetryConfig, run_id: Optional[str] = None, + overwrite_rules: bool = False, end_stream_on_error: bool = False, ): - super().__init__(channel, config, run_id, end_stream_on_error) + super().__init__(channel, config, run_id, overwrite_rules, end_stream_on_error) def ingest(self, *requests: IngestWithConfigDataStreamRequest): """ @@ -41,7 +69,7 @@ def ingest(self, *requests: IngestWithConfigDataStreamRequest): """ super().ingest(*requests) - def start_run( + def attach_run( self, channel: SiftChannel, run_name: str, @@ -50,15 +78,16 @@ def start_run( tags: Optional[List[str]] = None, ): """ - Create a run to use as part of the call to `ingest`. + Retrieve an existing run or create one to use during this period of ingestion. """ - super().start_run(channel, run_name, description, organization_id, tags) + super().attach_run(channel, run_name, description, organization_id, tags) - def end_run(self): + def detach_run(self): """ - End the current run if any and don't include it in subsequent calls to `ingest`. + Detach run from this period of ingestion. Subsequent data ingested won't be associated with + the run being detached. """ - super().end_run() + super().detach_run() def try_create_ingestion_request( self, diff --git a/python/lib/sift_py/rule/__init__.py b/python/lib/sift_py/rule/__init__.py deleted file mode 100644 index c0344678..00000000 --- a/python/lib/sift_py/rule/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -""" -Concerned with the Sift Rules API. -""" diff --git a/python/pyproject.toml b/python/pyproject.toml index 30c2e1a3..829af376 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -3,6 +3,7 @@ name = "sift_py" version = "0.1" description = "Python client library for the Sift API" dependencies = [ + "typing-extensions==4.12.2", # https://grpc.github.io/grpc/python/ "grpcio==1.64.1", # https://googleapis.dev/python/protobuf/latest/ @@ -13,11 +14,17 @@ dependencies = [ "PyYAML==6.0.1", "types-PyYAML==6.0.12.20240311", ] +requires-python = ">=3.9" [project.optional-dependencies] development = [ - "pytest", # test framework - "ruff", # formatter + linter + # testing tools + "pytest", + "pytest-mock", + "grpcio-testing", + + # formatter + linter + "ruff", ] [build-system] @@ -62,3 +69,6 @@ exclude = [ "lib/google", "lib/protoc_gen_openapiv2", ] + +[tool.ruff.lint] +select = ["F", "W", "I", "N", "TID"] diff --git a/python/scripts/dev b/python/scripts/dev index 0568a5f8..b926e2ca 100755 --- a/python/scripts/dev +++ b/python/scripts/dev @@ -2,7 +2,10 @@ usage() { cat<