Skip to content

Commit

Permalink
python(feature): rules (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis authored Jun 20, 2024
1 parent 6ff1523 commit 4d3eb96
Show file tree
Hide file tree
Showing 39 changed files with 2,304 additions and 363 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: "3.8"
python-version: "3.9"

- name: Pip install
working-directory: python
Expand Down
6 changes: 4 additions & 2 deletions examples/python/annotations/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

credentials = grpc.ssl_channel_credentials()
call_credentials = grpc.access_token_call_credentials(API_KEY)
composite_credentials = grpc.composite_channel_credentials(credentials, call_credentials)
composite_credentials = grpc.composite_channel_credentials(
credentials, call_credentials
)

with grpc.secure_channel(BASE_URI, composite_credentials) as channel:
annotation_service = AnnotationServiceStub(channel)
request = ListAnnotationsRequest(filter=f"name.matches(\"(?i){name}\")")
request = ListAnnotationsRequest(filter=f'name.matches("(?i){name}")')
response = annotation_service.ListAnnotations(request)
print(response)
channel.close()
24 changes: 18 additions & 6 deletions examples/python/ingest_with_config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sift.ingestion_configs.v1.ingestion_configs_pb2 as ingestconf
import sift.runs.v2.runs_pb2 as runpb


def main():
"""
This is an example script demonstrating how to use Sift's IngestService_IngestWithConfigDataStream
Expand Down Expand Up @@ -39,12 +40,14 @@ def main():
with ingest.use_secure_channel(api_key, base_uri) as channel:
channel_configs = []

for channel_config in ExampleTestRunConfig.CHANNEL_CONFIG_PARAMS:
for channel_config in ExampleTestRunConfig.CHANNEL_CONFIG_PARAMS:
name, component, desc, unit = channel_config
conf = ingest.create_double_type_channel_config(name, component, desc, unit)
channel_configs.append(conf)

flow_config = ingest.create_flow_config(ExampleTestRunConfig.FLOW_NAME, *channel_configs)
flow_config = ingest.create_flow_config(
ExampleTestRunConfig.FLOW_NAME, *channel_configs
)

print("Creating ingestion config... ", end="")
ingestion_config = ingest.create_ingestion_config(
Expand All @@ -67,12 +70,15 @@ def main():
print(f"ok [run_id {run.run_id}]")

print("Beginning ingestion...")
ingestion_simulator = create_ingestion_simulator(run, ingestion_config, flow_config)
ingestion_simulator = create_ingestion_simulator(
run, ingestion_config, flow_config
)
ingest.ingest_with_config(channel, ingestion_simulator)
print("Ingestion completed.")

channel.close()


class ExampleTestRunConfig:
"""
This is a sample of various properties we'll use to constitute an asset, its associated channels,
Expand All @@ -87,11 +93,14 @@ class ExampleTestRunConfig:
FLOW_NAME = "example_flow"

# (name, component, description, units)
CHANNEL_CONFIG_PARAMS: List[Tuple[str, Optional[str], Optional[str], Optional[str]]] = [
CHANNEL_CONFIG_PARAMS: List[
Tuple[str, Optional[str], Optional[str], Optional[str]]
] = [
("pressure", None, "pressure applied", "mmHg"),
("velocity", "mainmotor", None, "m/s"),
]


def create_ingestion_simulator(
run: runpb.Run,
ingestion_config: ingestconf.IngestionConfig,
Expand All @@ -103,7 +112,7 @@ def create_ingestion_simulator(
some amount of data points (100 by default) 5ms apart that will get ingested into the Sift API.
"""

current_timestamp = run.start_time
current_timestamp = run.start_time
total_messages_sent = 0

for i in range(num_data_points):
Expand All @@ -127,9 +136,12 @@ def create_ingestion_simulator(
request.channel_values.append(velocity)

total_messages_sent += 1
print(f"Sending message [time={timestamp.ToJsonString()} run={run.run_id} total_messages_sent={total_messages_sent}]")
print(
f"Sending message [time={timestamp.ToJsonString()} run={run.run_id} total_messages_sent={total_messages_sent}]"
)

yield request


if __name__ == "__main__":
main()
27 changes: 21 additions & 6 deletions examples/python/ingest_with_config/sift_ingestion_utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
from google.protobuf import timestamp_pb2
from typing import Any, cast, Generator, Optional
import grpc
from sift.common.type.v1.channel_data_type_pb2 import ChannelDataType
from sift.common.type.v1.channel_data_type_pb2 import ChannelDataType
import sift.ingest.v1.ingest_pb2 as ingest
import sift.ingest.v1.ingest_pb2_grpc as ingest_grpc
import sift.ingestion_configs.v1.ingestion_configs_pb2 as ingestconf
import sift.ingestion_configs.v1.ingestion_configs_pb2_grpc as ingestconf_grpc
import sift.runs.v2.runs_pb2 as run
import sift.runs.v2.runs_pb2_grpc as run_grpc


def use_secure_channel(api_key: str, base_uri: str) -> grpc.Channel:
"""
Produces channel that is used to create a secure connection to a gRPC server.
This is intended to be used by all stubs.
"""
credentials = grpc.ssl_channel_credentials()
call_credentials = grpc.access_token_call_credentials(api_key)
composite_credentials = grpc.composite_channel_credentials(credentials, call_credentials)
return grpc.secure_channel(base_uri,composite_credentials)
composite_credentials = grpc.composite_channel_credentials(
credentials, call_credentials
)
return grpc.secure_channel(base_uri, composite_credentials)


def create_double_type_channel_config(
name: str,
Expand All @@ -42,7 +46,10 @@ def create_double_type_channel_config(

return config

def create_flow_config(flow_name: str, *channel_configs: ingestconf.ChannelConfig) -> ingestconf.FlowConfig:

def create_flow_config(
flow_name: str, *channel_configs: ingestconf.ChannelConfig
) -> ingestconf.FlowConfig:
"""
Creates a flow config that describes a group of channels that will telemeter data.
"""
Expand All @@ -53,6 +60,7 @@ def create_flow_config(flow_name: str, *channel_configs: ingestconf.ChannelConfi

return config


def create_ingestion_config(
channel: grpc.Channel,
asset_name: str,
Expand All @@ -67,10 +75,13 @@ def create_ingestion_config(
for flow_config in flow_configs:
request.flows.append(flow_config)

response = ingestconf_grpc.IngestionConfigServiceStub(channel).CreateIngestionConfig(request)
response = ingestconf_grpc.IngestionConfigServiceStub(
channel
).CreateIngestionConfig(request)

return cast(ingestconf.CreateIngestionConfigResponse, response).ingestion_config


def create_run(
channel: grpc.Channel,
name: str,
Expand Down Expand Up @@ -102,6 +113,7 @@ def create_run(

return cast(run.CreateRunResponse, response).run


def ingest_with_config(
channel: grpc.Channel,
ingestion_iter: Generator[ingest.IngestWithConfigDataStreamRequest, Any, None],
Expand All @@ -111,10 +123,13 @@ def ingest_with_config(
Data should be available to view shortly after this function concludes
"""
try:
ingest_grpc.IngestServiceStub(channel).IngestWithConfigDataStream(ingestion_iter)
ingest_grpc.IngestServiceStub(channel).IngestWithConfigDataStream(
ingestion_iter
)
except Exception as e:
print(f"Something went wrong during ingestion: {e}")


def delete_run(run_id: str, api_key: str, base_uri: str):
"""
Handy utility to delete your test runs
Expand Down
4 changes: 4 additions & 0 deletions python/examples/rule_expressions.example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
log_substring_contains:
contains($1, $2)
is_even:
mod($1, 2) == 0
42 changes: 37 additions & 5 deletions python/examples/telemetry_config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@ ingestion_client_key: lunar_vehicle_426
channels:
log_channel: &log_channel
name: log
data_type: CHANNEL_DATA_TYPE_STRING
data_type: string
description: asset logs

velocity_channel: &velocity_channel
name: velocity
data_type: CHANNEL_DATA_TYPE_DOUBLE
data_type: double
description: speed
unit: Miles Per Hour
component: mainmotor

voltage_channel: &voltage_channel
name: voltage
data_type: CHANNEL_DATA_TYPE_INT_32
data_type: int32
description: voltage at the source
unit: Volts

vehicle_state_channel: &vehicle_state_channel
name: vehicle_state
data_type: CHANNEL_DATA_TYPE_ENUM
data_type: enum
description: vehicle state
unit: vehicle state
enum_types:
Expand All @@ -35,7 +35,7 @@ channels:

gpio_channel: &gpio_channel
name: gpio
data_type: CHANNEL_DATA_TYPE_BIT_FIELD
data_type: bit_field
description: on/off values for pins on gpio
bit_field_elements:
- name: 12v
Expand All @@ -51,6 +51,37 @@ channels:
index: 7
bit_count: 1

rules:
- name: overheating
description: Checks for vehicle overheating
expression: $1 == "Accelerating" && $2 > 80
channel_references:
- $1: *vehicle_state_channel
- $2: *voltage_channel
type: review

- name: speeding
description: Checks high vehicle speed
type: phase
expression: $1 > 20
channel_references:
- $1: *velocity_channel

- name: failures
description: Checks for failure logs
type: review
assignee: [email protected]
expression:
name: log_substring_contains
channel_references:
- $1: *log_channel
sub_expressions:
- $2: ERROR
tags:
- foo
- bar
- baz

flows:
- name: readings
channels:
Expand All @@ -67,3 +98,4 @@ flows:
- name: logs
channels:
- <<: *log_channel

2 changes: 1 addition & 1 deletion python/lib/sift_internal/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
INTERNAL MODULE
Everything inside of this module is not meant to be used publicly. Proceed at your own risk.
Everything inside of this module is not meant to be used publicly.
"""
24 changes: 24 additions & 0 deletions python/lib/sift_internal/convert/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

import json
from abc import ABC, abstractmethod
from typing import Any


class AsJson(ABC):
"""
Utility sub-types that require custom-serialization meant to be used in conjunction with the
`to_json` function. Sub-types should implement `as_json` which should return the object that
you want passed to `json.dumps`.
"""

@abstractmethod
def as_json(self) -> Any:
pass


def to_json(value: Any) -> str:
"""
Serializes `value` to a JSON string uses the `AsJson.as_json` implementation of the type.
"""
return json.dumps(value, default=lambda x: x.as_json())
37 changes: 18 additions & 19 deletions python/lib/sift_internal/convert/protobuf.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Generic, Type, TypeVar

from google.protobuf.message import Message
from typing import cast, Optional, Type, TypeVar

ProtobufMessage = Message

T = TypeVar("T", bound=ProtobufMessage)


class AsProtobuf(ABC):
class AsProtobuf(ABC, Generic[T]):
"""
Abstract base class used to create create sub-types that can be treated
as an object that can be converted into an instance of `ProtobufMessage`.
If there are multiple possible protobuf targets then `as_pb` may be overloaded.
"""

@abstractmethod
def as_pb(self, klass: Type[ProtobufMessage]) -> Optional[ProtobufMessage]:
def as_pb(self, klass: Type[T]) -> T:
"""
Performs the conversion into a sub-type of `ProtobufMessage`. Should return `None`
if conversion fails.
Performs the conversion into a sub-type of `ProtobufMessage`.
"""
pass


T = TypeVar("T", bound=ProtobufMessage)


def try_cast_pb(value: AsProtobuf, target_klass: Type[T]) -> T:
"""
Tries to cast the `value` to `target_klass`, otherwise, returns a `TypeError`.
"""
value_pb = value.as_pb(target_klass)
if isinstance(value_pb, target_klass):
return cast(target_klass, value_pb)
raise TypeError(
f"Expected a '{target_klass.__module__}{target_klass.__name__}' but got {value.__module__}{value.__class__.__name__}"
)
@classmethod
@abstractmethod
def from_pb(cls, message: Type[T]) -> T:
"""
Converts a protobuf object to the type of the sub-class class.
"""
pass
File renamed without changes.
Loading

0 comments on commit 4d3eb96

Please sign in to comment.