From 79a8dafa7af1ac33eb3c8fc7435f9556c12cc94c Mon Sep 17 00:00:00 2001 From: Benji Nguyen <45523555+solidiquis@users.noreply.github.com> Date: Fri, 12 Jul 2024 14:14:10 -0700 Subject: [PATCH] python(feature): downloading telemetry (#72) --- python/lib/sift_py/__init__.py | 63 +++ python/lib/sift_py/_internal/cel.py | 18 + python/lib/sift_py/_internal/channel.py | 5 + .../sift_py/_internal/convert/timestamp.py | 9 + .../sift_py/_internal/test_util/channel.py | 66 +++ python/lib/sift_py/_internal/time.py | 48 ++ python/lib/sift_py/data/__init__.py | 171 ++++++ python/lib/sift_py/data/_channel.py | 38 ++ python/lib/sift_py/data/_deserialize.py | 208 ++++++++ python/lib/sift_py/data/_deserialize_test.py | 134 +++++ python/lib/sift_py/data/_validate.py | 10 + python/lib/sift_py/data/error.py | 5 + python/lib/sift_py/data/query.py | 299 +++++++++++ python/lib/sift_py/data/service.py | 492 ++++++++++++++++++ python/lib/sift_py/data/service_test.py | 224 ++++++++ python/lib/sift_py/error.py | 11 + .../grpc/_async_interceptors/__init__.py | 0 .../sift_py/grpc/_async_interceptors/base.py | 72 +++ .../grpc/_async_interceptors/metadata.py | 36 ++ python/lib/sift_py/grpc/transport.py | 48 ++ python/lib/sift_py/ingestion/_service_test.py | 20 + python/lib/sift_py/ingestion/channel.py | 32 +- python/pyproject.toml | 7 +- 23 files changed, 2008 insertions(+), 8 deletions(-) create mode 100644 python/lib/sift_py/_internal/cel.py create mode 100644 python/lib/sift_py/_internal/channel.py create mode 100644 python/lib/sift_py/_internal/convert/timestamp.py create mode 100644 python/lib/sift_py/_internal/time.py create mode 100644 python/lib/sift_py/data/__init__.py create mode 100644 python/lib/sift_py/data/_channel.py create mode 100644 python/lib/sift_py/data/_deserialize.py create mode 100644 python/lib/sift_py/data/_deserialize_test.py create mode 100644 python/lib/sift_py/data/_validate.py create mode 100644 python/lib/sift_py/data/error.py create mode 100644 python/lib/sift_py/data/query.py create mode 100644 python/lib/sift_py/data/service.py create mode 100644 python/lib/sift_py/data/service_test.py create mode 100644 python/lib/sift_py/error.py create mode 100644 python/lib/sift_py/grpc/_async_interceptors/__init__.py create mode 100644 python/lib/sift_py/grpc/_async_interceptors/base.py create mode 100644 python/lib/sift_py/grpc/_async_interceptors/metadata.py diff --git a/python/lib/sift_py/__init__.py b/python/lib/sift_py/__init__.py index bd888694..ab1d1526 100644 --- a/python/lib/sift_py/__init__.py +++ b/python/lib/sift_py/__init__.py @@ -16,6 +16,7 @@ - [Sending data to Sift](#sending-data-to-sift) * [Ingestion Performance](#ingestion-performance) - [Buffered Ingestion](#buffered-ingestion) +* [Downloading Telemetry](#downloading-telemetry) * [More Examples](#more-examples) ## Introduction @@ -846,6 +847,68 @@ def nostromos_lv_426() -> TelemetryConfig: Visit the `sift_py.ingestion.service.IngestionService.buffered_ingestion` function definition for further details. +## Downloading Telemetry + +To download your telemetry locally you'll want to make use of the `sift_py.data` module. Them module-level documentation +contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them +into a `pandas` data frame, and writing the results out to a CSV: + +```python +import asyncio +import functools +import pandas as pd +from sift_py.data.query import ChannelQuery, DataQuery +from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel +from sift_py.data.service import DataService + + +async def channel_demo(): + channel_config: SiftChannelConfig = { + "apikey": "my-key" + "uri": "sift-uri" + } + + async with use_sift_async_channel(channel_config) as channel: + data_service = DataService(channel) + + query = DataQuery( + asset_name="NostromoLV426", + start_time="2024-07-04T18:09:08.555-07:00", + end_time="2024-07-04T18:09:11.556-07:00", + channels=[ + ChannelQuery( + channel_name="voltage", + run_name="[NostromoLV426].1720141748.047512" + ), + ChannelQuery( + channel_name="velocity", + component="mainmotors", + run_name="[NostromoLV426].1720141748.047512", + ), + ChannelQuery( + channel_name="gpio", + run_name="[NostromoLV426].1720141748.047512", + ), + ], + ) + + result = await data_service.execute(query) + + data_frames = [ + pd.DataFrame(data.columns()) + for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v") + ] + + merged_frame = functools.reduce( + lambda x, y: pd.merge_asof(x, y, on="time"), data_frames + ) + + merged_frame.to_csv("my_csv.csv") + +if __name__ == "__main__": + asyncio.run(example()) +``` + ## More Examples For more comphrensive examples demonstrating a little bit of everything, you may diff --git a/python/lib/sift_py/_internal/cel.py b/python/lib/sift_py/_internal/cel.py new file mode 100644 index 00000000..97090fe7 --- /dev/null +++ b/python/lib/sift_py/_internal/cel.py @@ -0,0 +1,18 @@ +""" +Utilities to interact with APIs that have a CEL-based interface. +""" + +from typing import Iterable + + +def cel_in(field: str, values: Iterable[str]) -> str: + """ + Produces a list membership CEL expression. Example: + + ```python + > print(cel_in("name", ["foo", "bar"])) + name in ["foo", "bar"] + ``` + """ + items = ",".join([f'"{val}"' for val in values]) + return f"{field} in [{items}]" diff --git a/python/lib/sift_py/_internal/channel.py b/python/lib/sift_py/_internal/channel.py new file mode 100644 index 00000000..9aad9770 --- /dev/null +++ b/python/lib/sift_py/_internal/channel.py @@ -0,0 +1,5 @@ +from typing import Optional + + +def channel_fqn(name: str, component: Optional[str]) -> str: + return name if component is None or len(component) == 0 else f"{component}.{name}" diff --git a/python/lib/sift_py/_internal/convert/timestamp.py b/python/lib/sift_py/_internal/convert/timestamp.py new file mode 100644 index 00000000..8a67a3dc --- /dev/null +++ b/python/lib/sift_py/_internal/convert/timestamp.py @@ -0,0 +1,9 @@ +from datetime import datetime + +from google.protobuf.timestamp_pb2 import Timestamp + + +def to_pb_timestamp(timestamp: datetime) -> Timestamp: + timestamp_pb = Timestamp() + timestamp_pb.FromDatetime(timestamp) + return timestamp_pb diff --git a/python/lib/sift_py/_internal/test_util/channel.py b/python/lib/sift_py/_internal/test_util/channel.py index a23c7fba..13626c1b 100644 --- a/python/lib/sift_py/_internal/test_util/channel.py +++ b/python/lib/sift_py/_internal/test_util/channel.py @@ -1,5 +1,17 @@ +from collections.abc import AsyncIterable, Callable, Iterable +from typing import Any, Optional, Union + +import grpc +import grpc.aio as grpc_aio +from grpc.aio import Channel as AsyncChannel from grpc_testing import Channel +SerializingFunction = Callable[[Any], bytes] +DeserializingFunction = Callable[[bytes], Any] +DoneCallbackType = Callable[[Any], None] +RequestIterableType = Union[Iterable[Any], AsyncIterable[Any]] +ResponseIterableType = AsyncIterable[Any] + class MockChannel(Channel): """ @@ -68,3 +80,57 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): pass + + +class MockAsyncChannel(AsyncChannel): + async def __aenter__(self): + pass + + async def __aexit__(self, exc_type, exc_val, exc_tb): + pass + + async def close(self, grace: Optional[float] = None): + pass + + def get_state(self, try_to_connect: bool = False) -> grpc.ChannelConnectivity: ... + + async def wait_for_state_change( + self, + last_observed_state: grpc.ChannelConnectivity, + ) -> None: + return None + + async def channel_ready(self) -> None: + return None + + def unary_unary( + self, + method: str, + request_serializer: Optional[SerializingFunction] = None, + response_deserializer: Optional[DeserializingFunction] = None, + _registered_method: Optional[bool] = False, + ) -> grpc_aio.UnaryUnaryMultiCallable: ... + + def unary_stream( + self, + method: str, + request_serializer: Optional[SerializingFunction] = None, + response_deserializer: Optional[DeserializingFunction] = None, + _registered_method: Optional[bool] = False, + ) -> grpc_aio.UnaryStreamMultiCallable: ... + + def stream_unary( + self, + method: str, + request_serializer: Optional[SerializingFunction] = None, + response_deserializer: Optional[DeserializingFunction] = None, + _registered_method: Optional[bool] = False, + ) -> grpc_aio.StreamUnaryMultiCallable: ... + + def stream_stream( + self, + method: str, + request_serializer: Optional[SerializingFunction] = None, + response_deserializer: Optional[DeserializingFunction] = None, + _registered_method: Optional[bool] = False, + ) -> grpc_aio.StreamStreamMultiCallable: ... diff --git a/python/lib/sift_py/_internal/time.py b/python/lib/sift_py/_internal/time.py new file mode 100644 index 00000000..8ff926e9 --- /dev/null +++ b/python/lib/sift_py/_internal/time.py @@ -0,0 +1,48 @@ +from datetime import datetime, timezone +from typing import Union, cast + +import pandas as pd +from google.protobuf.timestamp_pb2 import Timestamp as TimestampPb + + +def to_timestamp_nanos(arg: Union[TimestampPb, pd.Timestamp, datetime, str, int]) -> pd.Timestamp: + """ + Converts a variety of time-types to a pandas timestamp which supports nano-second precision. + """ + + if isinstance(arg, pd.Timestamp): + return arg + elif isinstance(arg, TimestampPb): + seconds = arg.seconds + nanos = arg.nanos + + dt = datetime.fromtimestamp(seconds, tz=timezone.utc) + ts = pd.Timestamp(dt) + + return cast(pd.Timestamp, ts + pd.Timedelta(nanos, unit="ns")) + + elif isinstance(arg, int): + dt = datetime.fromtimestamp(arg, tz=timezone.utc) + return cast(pd.Timestamp, pd.Timestamp(dt)) + + else: + return cast(pd.Timestamp, pd.Timestamp(arg)) + + +def to_timestamp_pb(arg: Union[datetime, str, int]) -> TimestampPb: + """ + Mainly used for testing at the moment. If using this for non-testing purposes + should probably make this more robust and support nano-second precision. + """ + + ts = TimestampPb() + + if isinstance(arg, datetime): + ts.FromDatetime(arg) + return ts + elif isinstance(arg, int): + ts.FromDatetime(datetime.fromtimestamp(arg)) + return ts + else: + ts.FromDatetime(datetime.fromisoformat(arg)) + return ts diff --git a/python/lib/sift_py/data/__init__.py b/python/lib/sift_py/data/__init__.py new file mode 100644 index 00000000..c086c447 --- /dev/null +++ b/python/lib/sift_py/data/__init__.py @@ -0,0 +1,171 @@ +""" +This module contains tools to download telemetry from the Sift data API. The +core component of this module is the `sift_py.data.service.DataService` and the +`sift_py.data.query` module. The former is what's used to execute a data query, +while the latter is what's used to actually construct the query. A typical query could look +something like this: + +```python +query = DataQuery( + asset_name="NostromoLV426", + start_time="2024-07-04T18:09:08.555-07:00", + end_time="2024-07-04T18:09:11.556-07:00", + sample_ms=16, + channels=[ + ChannelQuery( + channel_name="voltage", + run_name="[NostromoLV426].1720141748.047512" + ), + ChannelQuery( + channel_name="velocity", + component="mainmotors", + run_name="[NostromoLV426].1720141748.047512", + ), + ChannelQuery( + channel_name="gpio", + run_name="[NostromoLV426].1720141748.047512", + ), + ], +) +``` + +This query, once passed to the `sift_py.data.service.DataService.execute` method, will +fetch data between `start_time` and `end_time` at the sampling rate given by `sample_ms`. + +> ⚠️ **Warning**: Note on Performance +> +> Currently the results of a query are all buffered in memory, so it it best to be mindful +> about your memory limitations and overall performance requirements when requesting data +> within a large time range and a slow sampling rate. Full-fidelity data is returned +> when the `sample_ms` is set to `0`. + +The data API allows you to download telemetry for both channels as well as calculated +channels. The following examples demonstrate how to download data for both channels and +calculated channels, respectively. + +* [Regular Channels](#regular-channels) +* [Calculated Channels](#calculated-channels) + +## Regular Channels + +```python +import asyncio +import functools +import pandas as pd +from sift_py.data.query import ChannelQuery, DataQuery +from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel +from sift_py.data.service import DataService + + +async def channel_demo(): + channel_config: SiftChannelConfig = { + "apikey": "my-key" + "uri": "sift-uri" + } + + async with use_sift_async_channel(channel_config) as channel: + data_service = DataService(channel) + + query = DataQuery( + asset_name="NostromoLV426", + start_time="2024-07-04T18:09:08.555-07:00", + end_time="2024-07-04T18:09:11.556-07:00", + channels=[ + ChannelQuery( + channel_name="voltage", + run_name="[NostromoLV426].1720141748.047512" + ), + ChannelQuery( + channel_name="velocity", + component="mainmotors", + run_name="[NostromoLV426].1720141748.047512", + ), + ChannelQuery( + channel_name="gpio", + run_name="[NostromoLV426].1720141748.047512", + ), + ], + ) + + result = await data_service.execute(query) + + data_frames = [ + pd.DataFrame(data.columns()) + for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v") + ] + + merged_frame = functools.reduce( + lambda x, y: pd.merge_asof(x, y, on="time"), data_frames + ) + + merged_frame.to_csv("my_csv.csv") + +if __name__ == "__main__": + asyncio.run(example()) +``` + +## Calculated Channels + +```python +import asyncio +import functools +import pandas as pd +from sift_py.data.query import ChannelQuery, DataQuery +from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel +from sift_py.data.service import DataService + + +async def channel_demo(): + channel_config: SiftChannelConfig = { + "apikey": "my-key" + "uri": "sift-uri" + } + + async with use_sift_async_channel(channel_config) as channel: + data_service = DataService(channel) + + query = DataQuery( + asset_name="NostromoLV426", + start_time="2024-07-04T18:09:08.555-07:00", + end_time="2024-07-04T18:09:11.556-07:00", + channels=[ + CalculatedChannelQuery( + channel_key="calc-voltage", + expression="$1 + 10", + expression_channel_references=[ + { + "reference": "$1", + "channel_name": "voltage", + }, + ], + run_name="[NostromoLV426].1720141748.047512", + ), + CalculatedChannelQuery( + channel_key="calc-velocity", + expression="$1 * 2", + expression_channel_references=[ + { + "reference": "$1", + "channel_name": "velocity", + "component": "mainmotors", + }, + ], + run_name="[NostromoLV426].1720141748.047512", + ), + ], + ) + + result = await data_service.execute(query) + calc_voltage, calc_velocity = result.channels("calc-voltage", "calc-velocity") + + calc_voltage_df = pd.DataFrame(calc_voltage.columns()) + calc_velocity_df = pd.DataFrame(calc_velocity.columns()) + + merged_frame = pd.merge_asof(calc_voltage_df, calc_velocity_df, on="time") + + merged_frame.to_csv("my_csv.csv") + +if __name__ == "__main__": + asyncio.run(example()) +``` +""" diff --git a/python/lib/sift_py/data/_channel.py b/python/lib/sift_py/data/_channel.py new file mode 100644 index 00000000..09f04923 --- /dev/null +++ b/python/lib/sift_py/data/_channel.py @@ -0,0 +1,38 @@ +from typing import Any, List + +import pandas as pd + +from sift_py.ingestion.channel import ChannelDataType + + +class ChannelTimeSeries: + data_type: ChannelDataType + time_column: List[pd.Timestamp] + value_column: List[Any] + + def __init__( + self, + data_type: ChannelDataType, + time_column: List[pd.Timestamp], + value_column: List[Any], + ): + if len(time_column) != len(value_column): + raise Exception("Both arguments, `time_column` and `value_column` must equal lengths.") + + self.data_type = data_type + self.time_column = time_column + self.value_column = value_column + + def sort_time_series(self): + points = [(t, v) for t, v in zip(self.time_column, self.value_column)] + points.sort(key=lambda x: x[0]) + + time_column = [] + value_column = [] + + for ts, val in points: + time_column.append(ts) + value_column.append(val) + + self.time_column = time_column + self.value_column = value_column diff --git a/python/lib/sift_py/data/_deserialize.py b/python/lib/sift_py/data/_deserialize.py new file mode 100644 index 00000000..709c9164 --- /dev/null +++ b/python/lib/sift_py/data/_deserialize.py @@ -0,0 +1,208 @@ +from copy import deepcopy +from enum import Enum +from typing import List, Tuple, cast + +from google.protobuf.any_pb2 import Any +from sift.data.v1.data_pb2 import ( + BitFieldValues, + BoolValues, + DoubleValues, + EnumValues, + FloatValues, + Int32Values, + Int64Values, + Metadata, + StringValues, + Uint32Values, + Uint64Values, +) + +from sift_py._internal.time import to_timestamp_nanos +from sift_py.data._channel import ChannelTimeSeries +from sift_py.error import SiftError +from sift_py.ingestion.channel import ChannelDataType + + +class ChannelValues(Enum): + DOUBLE_VALUES = "sift.data.v1.DoubleValues" + FLOAT_VALUES = "sift.data.v1.FloatValues" + STRING_VALUES = "sift.data.v1.StringValues" + ENUM_VALUES = "sift.data.v1.EnumValues" + BIT_FIELD_VALUES = "sift.data.v1.BitFieldValues" + BOOL_VALUES = "sift.data.v1.BoolValues" + INT32_VALUES = "sift.data.v1.Int32Values" + INT64_VALUES = "sift.data.v1.Int64Values" + UINT32_VALUES = "sift.data.v1.Uint32Values" + UINT64_VALUES = "sift.data.v1.Uint64Values" + + +def try_deserialize_channel_data(channel_values: Any) -> List[Tuple[Metadata, ChannelTimeSeries]]: + if ChannelValues.DOUBLE_VALUES.value in channel_values.type_url: + double_values = cast(DoubleValues, DoubleValues.FromString(channel_values.value)) + metadata = double_values.metadata + + time_column = [] + double_value_column = [] + + for v in double_values.values: + time_column.append(to_timestamp_nanos(v.timestamp)) + double_value_column.append(v.value) + + time_series = ChannelTimeSeries( + ChannelDataType.from_pb(metadata.data_type), time_column, double_value_column + ) + return [(metadata, time_series)] + + elif ChannelValues.FLOAT_VALUES.value in channel_values.type_url: + float_values = cast(FloatValues, FloatValues.FromString(channel_values.value)) + metadata = float_values.metadata + + time_column = [] + float_value_column = [] + + for float_v in float_values.values: + time_column.append(to_timestamp_nanos(float_v.timestamp)) + float_value_column.append(float_v.value) + + time_series = ChannelTimeSeries( + ChannelDataType.from_pb(metadata.data_type), time_column, float_value_column + ) + return [(metadata, time_series)] + + elif ChannelValues.STRING_VALUES.value in channel_values.type_url: + string_values = cast(StringValues, StringValues.FromString(channel_values.value)) + metadata = string_values.metadata + + time_column = [] + string_value_column = [] + + for string_v in string_values.values: + time_column.append(to_timestamp_nanos(string_v.timestamp)) + string_value_column.append(string_v.value) + + time_series = ChannelTimeSeries( + ChannelDataType.from_pb(metadata.data_type), time_column, string_value_column + ) + return [(metadata, time_series)] + + elif ChannelValues.ENUM_VALUES.value in channel_values.type_url: + enum_values = cast(EnumValues, EnumValues.FromString(channel_values.value)) + metadata = enum_values.metadata + + time_column = [] + enum_value_column = [] + + for enum_v in enum_values.values: + time_column.append(to_timestamp_nanos(enum_v.timestamp)) + enum_value_column.append(enum_v.value) + + time_series = ChannelTimeSeries( + ChannelDataType.from_pb(metadata.data_type), time_column, enum_value_column + ) + return [(metadata, time_series)] + + elif ChannelValues.BOOL_VALUES.value in channel_values.type_url: + bool_values = cast(BoolValues, BoolValues.FromString(channel_values.value)) + metadata = bool_values.metadata + + time_column = [] + bool_value_column = [] + + for bool_v in bool_values.values: + time_column.append(to_timestamp_nanos(bool_v.timestamp)) + bool_value_column.append(bool_v.value) + + time_series = ChannelTimeSeries( + ChannelDataType.from_pb(metadata.data_type), time_column, bool_value_column + ) + return [(metadata, time_series)] + + elif ChannelValues.INT32_VALUES.value in channel_values.type_url: + int32_values = cast(Int32Values, Int32Values.FromString(channel_values.value)) + metadata = int32_values.metadata + + time_column = [] + int32_value_column = [] + + for int32_v in int32_values.values: + time_column.append(to_timestamp_nanos(int32_v.timestamp)) + int32_value_column.append(int32_v.value) + + time_series = ChannelTimeSeries( + ChannelDataType.from_pb(metadata.data_type), time_column, int32_value_column + ) + return [(metadata, time_series)] + + elif ChannelValues.INT64_VALUES.value in channel_values.type_url: + int64_values = cast(Int64Values, Int64Values.FromString(channel_values.value)) + metadata = int64_values.metadata + + time_column = [] + int64_value_column = [] + + for int64_v in int64_values.values: + time_column.append(to_timestamp_nanos(int64_v.timestamp)) + int64_value_column.append(int64_v.value) + + time_series = ChannelTimeSeries( + ChannelDataType.from_pb(metadata.data_type), time_column, int64_value_column + ) + return [(metadata, time_series)] + + elif ChannelValues.UINT32_VALUES.value in channel_values.type_url: + uint32_values = cast(Uint32Values, Uint32Values.FromString(channel_values.value)) + metadata = uint32_values.metadata + + time_column = [] + uint32_value_column = [] + + for uint32_v in uint32_values.values: + time_column.append(to_timestamp_nanos(uint32_v.timestamp)) + uint32_value_column.append(uint32_v.value) + + time_series = ChannelTimeSeries( + ChannelDataType.from_pb(metadata.data_type), time_column, uint32_value_column + ) + return [(metadata, time_series)] + + elif ChannelValues.UINT64_VALUES.value in channel_values.type_url: + uint64_values = cast(Uint64Values, Uint64Values.FromString(channel_values.value)) + metadata = uint64_values.metadata + + time_column = [] + uint64_value_column = [] + + for uint64_v in uint64_values.values: + time_column.append(to_timestamp_nanos(uint64_v.timestamp)) + uint64_value_column.append(uint64_v.value) + + time_series = ChannelTimeSeries( + ChannelDataType.from_pb(metadata.data_type), time_column, uint64_value_column + ) + return [(metadata, time_series)] + + elif ChannelValues.BIT_FIELD_VALUES.value in channel_values.type_url: + bit_field_values = cast(BitFieldValues, BitFieldValues.FromString(channel_values.value)) + metadata = bit_field_values.metadata + data_type = ChannelDataType.from_pb(metadata.data_type) + channel_name = metadata.channel.name + + parsed_data: List[Tuple[Metadata, ChannelTimeSeries]] = [] + + for bit_field_element in bit_field_values.values: + md_copy = deepcopy(bit_field_values.metadata) + md_copy.channel.name = f"{channel_name}.{bit_field_element.name}" + + time_column = [] + bit_field_el_column = [] + + for bf_v in bit_field_element.values: + time_column.append(to_timestamp_nanos(bf_v.timestamp)) + bit_field_el_column.append(bf_v.value) + + time_series = ChannelTimeSeries(data_type, time_column, bit_field_el_column) + parsed_data.append((md_copy, time_series)) + + return parsed_data + + raise SiftError(f"Received an unknown channel-type '{channel_values.type_url}'.") diff --git a/python/lib/sift_py/data/_deserialize_test.py b/python/lib/sift_py/data/_deserialize_test.py new file mode 100644 index 00000000..78fcfc2e --- /dev/null +++ b/python/lib/sift_py/data/_deserialize_test.py @@ -0,0 +1,134 @@ +from google.protobuf.any_pb2 import Any +from sift.common.type.v1.channel_bit_field_element_pb2 import ChannelBitFieldElement +from sift.common.type.v1.channel_data_type_pb2 import ( + CHANNEL_DATA_TYPE_BIT_FIELD, + CHANNEL_DATA_TYPE_DOUBLE, +) +from sift.data.v1.data_pb2 import ( + BitFieldElementValues, + BitFieldValue, + BitFieldValues, + DoubleValue, + DoubleValues, + Metadata, +) + +from sift_py._internal.time import to_timestamp_nanos, to_timestamp_pb +from sift_py.data._deserialize import try_deserialize_channel_data + + +def test_try_deserialize_channel_data_double(): + metadata = Metadata( + data_type=CHANNEL_DATA_TYPE_DOUBLE, channel=Metadata.Channel(name="double-channel") + ) + + time_a = "2024-07-04T18:09:08.555-07:00" + time_b = "2024-07-04T18:09:09.555-07:00" + + double_values = DoubleValues( + metadata=metadata, + values=[ + DoubleValue( + timestamp=to_timestamp_pb(time_a), + value=10, + ), + DoubleValue( + timestamp=to_timestamp_pb(time_b), + value=11, + ), + ], + ) + + raw_values = Any() + raw_values.Pack(double_values) + + deserialized_data = try_deserialize_channel_data(raw_values) + + assert len(deserialized_data) == 1 + + metadata, time_series = deserialized_data[0] + + assert metadata.data_type == CHANNEL_DATA_TYPE_DOUBLE + assert metadata.channel.name == "double-channel" + assert len(time_series.time_column) == 2 + assert len(time_series.value_column) == 2 + assert time_series.value_column[0] == 10 + assert time_series.value_column[1] == 11 + assert time_series.time_column[0] == to_timestamp_nanos(time_a) + assert time_series.time_column[1] == to_timestamp_nanos(time_b) + + +def test_try_deserialize_channel_data_bit_field_elements(): + metadata = Metadata( + data_type=CHANNEL_DATA_TYPE_BIT_FIELD, + channel=Metadata.Channel( + name="gpio", + bit_field_elements=[ + ChannelBitFieldElement( + name="12v", + index=0, + bit_count=4, + ), + ChannelBitFieldElement( + name="heater", + index=4, + bit_count=4, + ), + ], + ), + ) + + time_a = "2024-07-04T18:09:08.555-07:00" + + value_a = int("10000001", 2) + value_b = int("11110001", 2) + + bit_field_values = BitFieldValues( + metadata=metadata, + values=[ + BitFieldElementValues( + name="12v", + values=[ + BitFieldValue( + timestamp=to_timestamp_pb(time_a), + value=value_a, + ) + ], + ), + BitFieldElementValues( + name="heater", + values=[ + BitFieldValue( + timestamp=to_timestamp_pb(time_a), + value=value_b, + ) + ], + ), + ], + ) + + raw_values = Any() + raw_values.Pack(bit_field_values) + + deserialized_data = try_deserialize_channel_data(raw_values) + + assert len(deserialized_data) == 2 + + metadata_12v, time_series_12v = deserialized_data[0] + metadata_heater, time_series_heater = deserialized_data[1] + + assert metadata_12v.data_type == CHANNEL_DATA_TYPE_BIT_FIELD + assert metadata_heater.data_type == CHANNEL_DATA_TYPE_BIT_FIELD + + assert metadata_12v.channel.name == "gpio.12v" + assert metadata_heater.channel.name == "gpio.heater" + + assert len(time_series_12v.time_column) == 1 + assert len(time_series_12v.value_column) == 1 + assert len(time_series_heater.time_column) == 1 + assert len(time_series_heater.value_column) == 1 + + assert time_series_12v.value_column[0] == value_a + assert time_series_heater.value_column[0] == value_b + assert time_series_12v.time_column[0] == to_timestamp_nanos(time_a) + assert time_series_heater.time_column[0] == to_timestamp_nanos(time_a) diff --git a/python/lib/sift_py/data/_validate.py b/python/lib/sift_py/data/_validate.py new file mode 100644 index 00000000..145cb390 --- /dev/null +++ b/python/lib/sift_py/data/_validate.py @@ -0,0 +1,10 @@ +import re + +CHANNEL_REFERENCE_REGEX = re.compile(r"^\$\d+$") + + +def validate_channel_reference(ref: str): + if CHANNEL_REFERENCE_REGEX.match(ref) is None: + raise ValueError( + f"Invalid channel reference key '{ref}'. Expected an integer prefixed with '$' e.g. '$1', '$2', and so on." + ) diff --git a/python/lib/sift_py/data/error.py b/python/lib/sift_py/data/error.py new file mode 100644 index 00000000..156cc32b --- /dev/null +++ b/python/lib/sift_py/data/error.py @@ -0,0 +1,5 @@ +class DataError(Exception): + msg: str + + def __init__(self, msg: str): + super().__init__(msg) diff --git a/python/lib/sift_py/data/query.py b/python/lib/sift_py/data/query.py new file mode 100644 index 00000000..2a77b095 --- /dev/null +++ b/python/lib/sift_py/data/query.py @@ -0,0 +1,299 @@ +""" +Module containing utilities to construct a data query which is ultimately +passed to `sift_py.data.service.DataService.execute` to download telemetry. + +This module also contains types that represent the result of a data query +which can be easily converted into a `pandas` data frame or series. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union, cast + +import pandas as pd +from google.protobuf.timestamp_pb2 import Timestamp as TimestampPb +from typing_extensions import NotRequired, TypeAlias + +from sift_py._internal.channel import channel_fqn +from sift_py._internal.time import to_timestamp_nanos +from sift_py.data._channel import ChannelTimeSeries +from sift_py.ingestion.channel import ChannelDataType + + +class DataQuery: + """ + A query that is meant to be passed to `sift_py.data.service.DataService.execute` to + retrieve telemetry. + + - `asset_name`: The name of the asset to query telemetry for. + - `start_time`: The start time of the time range of the data to request. + - `end_time`: The end time of the time range of the data to request. + - `sample_ms`: + The sampling rate to use when retrieving data. The lower the sampling rate, the + greater the data-fidelity. A sampling rate of `0` retrieves full-fidelity data. + - `channels`: + List of either `ChannelQuery` or `CalculatedChannelQuery`, but not both. Represents the + channels to retrieve data from. + """ + + DEFAULT_PAGE_SIZE = 100_000 + + asset_name: str + start_time: pd.Timestamp + end_time: pd.Timestamp + sample_ms: int + page_size: int + channels: List[Union[ChannelQuery, CalculatedChannelQuery]] + + def __init__( + self, + asset_name: str, + start_time: Union[pd.Timestamp, TimestampPb, datetime, str, int], + end_time: Union[pd.Timestamp, TimestampPb, datetime, str, int], + channels: List[Union[ChannelQuery, CalculatedChannelQuery]], + sample_ms: int = 0, + # Currently not in use outside of testing purposes. + _: int = DEFAULT_PAGE_SIZE, + ): + self.start_time = to_timestamp_nanos(start_time) + self.end_time = to_timestamp_nanos(end_time) + self.asset_name = asset_name + self.sample_ms = sample_ms + self.channels = channels + self.page_size = self.__class__.DEFAULT_PAGE_SIZE + + +""" +Either the fully qualified channel name or a tuple of the fully qualified +channel name as well as the channel's type. +""" +ChannelLookupInfo: TypeAlias = Union[str, Tuple[str, ChannelDataType]] + + +class DataQueryResult: + """ + The result of a data query which can contain multiple channels. + """ + + _result: Dict[str, List[ChannelTimeSeries]] + + def __init__(self, merged_channel_data: Dict[str, List[ChannelTimeSeries]]): + self._result = merged_channel_data + + def channel(self, lookup: ChannelLookupInfo) -> Optional[DataQueryResultSet]: + """ + Like `channels` but returns a single `DataQueryResultSet`. + """ + + result = self.channels(lookup) + + if len(result) > 0: + return result[0] + + return None + + def channels(self, *lookup: ChannelLookupInfo) -> List[DataQueryResultSet]: + """ + Returns a `sift_py.data.channel.ChannelTimeSeries` given the `lookup` argument. + If a `lookup` is a fully qualified name (FQN) `str` and there are multiple channels + with the same FQN, this will raise a `ValueError`. In these situations, `lookup` must + be a tuple where the first item is the channel FQN and the second the + `sift_py.ingestion.channel.ChannelDataType`. + + If `lookup` is a tuple, then the channel data-type will be appended to the key referencing + the `sift_py.data.channel.ChannelTimeSeries`. + """ + + result: List[DataQueryResultSet] = [] + + for info in lookup: + if isinstance(info, str): + time_series = self._result.get(info) + + if not time_series: + continue + if len(time_series) > 1: + raise ValueError( + f"Ambiguous lookup provided: '{info}' is associated with {len(time_series)} channels." + ) + + series = time_series[0] + result.append( + DataQueryResultSet( + identifier=info, + timestamps=series.time_column, + values=series.value_column, + ) + ) + else: + fqn, data_type = cast(Tuple[str, ChannelDataType], info) + identifier = f"{fqn}.{data_type.as_human_str()}" + + time_series = self._result.get(fqn) + + if not time_series: + continue + if len(time_series) == 1: + series = time_series[0] + result.append( + DataQueryResultSet( + identifier=identifier, + timestamps=series.time_column, + values=series.value_column, + ) + ) + continue + + for series in time_series: + if series.data_type == data_type: + result.append( + DataQueryResultSet( + identifier=identifier, + timestamps=series.time_column, + values=series.value_column, + ) + ) + break + + return result + + def all_channels(self) -> List[DataQueryResultSet]: + """ + Returns all channel data. + """ + + result = [] + + for fqn, time_series in self._result.items(): + if len(time_series) > 1: + for series in time_series: + human_data_type = series.data_type.as_human_str() + fqn_extended = f"{fqn}.{human_data_type}" + + result.append( + DataQueryResultSet( + identifier=fqn_extended, + timestamps=series.time_column, + values=series.value_column, + ) + ) + continue + + for series in time_series: + result.append( + DataQueryResultSet( + identifier=fqn, + timestamps=series.time_column, + values=series.value_column, + ) + ) + + return result + + +class DataQueryResultSet: + """ + Represents time series data for a single channel. Can easily be converted into a `pandas` data frame like so: + + ```python + pd.DataFrame(data_query_result_set.all_columns()) + ``` + + """ + + identifier: str + timestamps: List[pd.Timestamp] + values: List[Any] + + def __init__(self, identifier: str, timestamps: List[pd.Timestamp], values: List[Any]): + self.identifier = identifier + self.timestamps = timestamps + self.values = values + + def value_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: + """ + Returns a single key-value pair dictionary meant to represent the value column of the data-set. + `column_name` can be used to override the name of the column. + """ + + if column_name is None: + return {self.identifier: self.values} + else: + return {column_name: self.values} + + def time_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: + """ + Returns a single key-value pair dictionary meant to represent the time column of the data-set. + `column_name` can be used to override the name of the column. + """ + if column_name is None: + return {"time": self.timestamps} + else: + return {column_name: self.timestamps} + + def columns( + self, + time_column_name: Optional[str] = None, + value_column_name: Optional[str] = None, + ) -> Dict[str, List[Any]]: + """ + Returns both the time and value columns with options to override the column names. + """ + + cols = self.time_column(time_column_name) + cols.update(self.value_column(value_column_name)) + return cols + + +class ChannelQuery: + """ + Represents a single channel to include in the `sift_py.data.query.DataQuery`. + """ + + channel_name: str + component: Optional[str] + run_name: Optional[str] + + def __init__( + self, + channel_name: str, + component: Optional[str] = None, + run_name: Optional[str] = None, + ): + self.channel_name = channel_name + self.component = component + self.run_name = run_name + + def fqn(self) -> str: + return channel_fqn(self.channel_name, self.component) + + +class ExpressionChannelReference(TypedDict): + reference: str + channel_name: str + component: NotRequired[str] + data_type: NotRequired[ChannelDataType] + + +class CalculatedChannelQuery: + """ + Represents a single calculated channel to include in the `sift_py.data.query.DataQuery`. + """ + + channel_key: str + expression: str + expression_channel_references: List[ExpressionChannelReference] + run_name: Optional[str] + + def __init__( + self, + channel_key: str, + expression: str, + expression_channel_references: List[ExpressionChannelReference], + run_name: Optional[str] = None, + ): + self.channel_key = channel_key + self.run_name = run_name + self.expression = expression + self.expression_channel_references = expression_channel_references diff --git a/python/lib/sift_py/data/service.py b/python/lib/sift_py/data/service.py new file mode 100644 index 00000000..cf939953 --- /dev/null +++ b/python/lib/sift_py/data/service.py @@ -0,0 +1,492 @@ +import asyncio +from collections import defaultdict +from typing import Dict, Iterable, List, Optional, Set, Tuple, Union, cast + +from google.protobuf.any_pb2 import Any +from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse +from sift.assets.v1.assets_pb2_grpc import AssetServiceStub +from sift.calculated_channels.v1.calculated_channels_pb2 import ( + ExpressionChannelReference, + ExpressionRequest, +) +from sift.calculated_channels.v1.calculated_channels_pb2_grpc import CalculatedChannelsServiceStub +from sift.channels.v2.channels_pb2 import Channel, ListChannelsRequest, ListChannelsResponse +from sift.channels.v2.channels_pb2_grpc import ChannelServiceStub +from sift.data.v1.data_pb2 import CalculatedChannelQuery as CalculatedChannelQueryPb +from sift.data.v1.data_pb2 import ChannelQuery as ChannelQueryPb +from sift.data.v1.data_pb2 import GetDataRequest, GetDataResponse, Query +from sift.data.v1.data_pb2_grpc import DataServiceStub +from sift.runs.v2.runs_pb2 import ListRunsRequest, ListRunsResponse, Run +from sift.runs.v2.runs_pb2_grpc import RunServiceStub +from typing_extensions import TypeAlias + +from sift_py._internal.cel import cel_in +from sift_py._internal.channel import channel_fqn +from sift_py._internal.convert.timestamp import to_pb_timestamp +from sift_py.data._channel import ChannelTimeSeries +from sift_py.data._deserialize import try_deserialize_channel_data +from sift_py.data._validate import validate_channel_reference +from sift_py.data.error import DataError +from sift_py.data.query import CalculatedChannelQuery, ChannelQuery, DataQuery, DataQueryResult +from sift_py.error import SiftError +from sift_py.grpc.transport import SiftAsyncChannel +from sift_py.ingestion.channel import ChannelDataType + + +class DataService: + """ + A service that asynchronously executes a `sift_py.data.query.DataQuery` to retrieve telemetry + for an arbitrary amount of channels (or calculated channels) within a user-specified time-range + and sampling rate. + """ + + # TODO: There is a pagination issue API side when requesting multiple channels in single request. + # If all data points for all channels in a single request don't fit into a single page, then + # paging seems to omit all but a single channel. We can increase this batch size once that issue + # has been resolved. In the mean time each channel gets its own request. + REQUEST_BATCH_SIZE = 1 + + AssetName: TypeAlias = str + ChannelFqn: TypeAlias = str + RunName: TypeAlias = str + + _asset_service_stub: AssetServiceStub + _channel_service_stub: ChannelServiceStub + _calculated_channel_service_stub: CalculatedChannelsServiceStub + _data_service_stub: DataServiceStub + _run_service_stub: RunServiceStub + + _cached_assets: Dict[AssetName, Asset] + _cached_channels: Dict[AssetName, Dict[ChannelFqn, List[Channel]]] + _cached_runs: Dict[RunName, Run] + + def __init__(self, channel: SiftAsyncChannel): + self._asset_service_stub = AssetServiceStub(channel) + self._channel_service_stub = ChannelServiceStub(channel) + self._calculated_channel_service_stub = CalculatedChannelsServiceStub(channel) + self._data_service_stub = DataServiceStub(channel) + self._run_service_stub = RunServiceStub(channel) + + self._cached_assets = {} + self._cached_channels = {} + self._cached_runs = {} + + async def execute(self, query: DataQuery, bust_cache: bool = False) -> DataQueryResult: + """ + Performs the actual query to retrieve telemetry. + """ + + if bust_cache: + self._bust_cache() + + asset = await self._load_asset(query.asset_name) + + channel_fqns = [] + + for c in query.channels: + if isinstance(c, ChannelQuery): + channel_fqns.append(channel_fqn(c.channel_name, c.component)) + elif isinstance(c, CalculatedChannelQuery): + for ref in c.expression_channel_references: + channel_fqns.append(channel_fqn(ref["channel_name"], ref.get("component"))) + + channels = await self._load_channels(asset, channel_fqns) + runs = await self._load_runs(query.channels) + + queries: List[Query] = [] + + for channel_query in query.channels: + if isinstance(channel_query, ChannelQuery): + fqn = channel_query.fqn() + run_name = channel_query.run_name + targets = channels.get(fqn) + + if not targets: + raise SiftError( + f"An unexpected error occurred. Expected channel '{fqn}' to have been loaded." + ) + cqueries = [ChannelQueryPb(channel_id=channel.channel_id) for channel in targets] + + if run_name is not None: + run = runs.get(run_name) + + if run is None: + raise SiftError( + f"An unexpected error occurred. Expected run '{run_name}' to have been loaded." + ) + + for cquery in cqueries: + cquery.run_id = run.run_id + + for cquery in cqueries: + queries.append(Query(channel=cquery)) + + elif isinstance(channel_query, CalculatedChannelQuery): + expression_channel_references = [] + + for expr_ref in channel_query.expression_channel_references: + validate_channel_reference(expr_ref["reference"]) + + fqn = channel_fqn(expr_ref["channel_name"], expr_ref.get("component")) + + targets = channels.get(fqn) + + if not targets: + raise SiftError( + f"An unexpected error occurred. Expected channel '{fqn}' to have been loaded." + ) + + channel_id = targets[0].channel_id + + if len(targets) > 1: + target_data_type = expr_ref.get("data_type") + + if target_data_type is None: + raise ValueError( + f"Found multiple channels with the fully qualified name '{fqn}'. A 'data_type' must be provided in `ExpressionChannelReference`." + ) + + for target in targets: + if ChannelDataType.from_pb(target.data_type) == target_data_type: + channel_id = target.channel_id + break + + expression_channel_references.append( + ExpressionChannelReference( + channel_reference=expr_ref["reference"], channel_id=channel_id + ) + ) + + expression_request = ExpressionRequest( + expression=channel_query.expression, + expression_channel_references=expression_channel_references, + ) + + calculated_cquery = CalculatedChannelQueryPb( + channel_key=channel_query.channel_key, + expression=expression_request, + ) + + run_name = channel_query.run_name + + if run_name is not None: + run = runs.get(run_name) + + if run is None: + raise SiftError( + f"An unexpected error occurred. Expected run '{run_name}' to have been loaded." + ) + + calculated_cquery.run_id = run.run_id + + queries.append(Query(calculated_channel=calculated_cquery)) + + else: + raise DataError("Unknown channel query type.") + + await self._validate_queries(queries) + + start_time = to_pb_timestamp(query.start_time) + end_time = to_pb_timestamp(query.end_time) + sample_ms = query.sample_ms + page_size = query.page_size + + tasks = [] + + for batch in self._batch_queries(queries): + req = GetDataRequest( + start_time=start_time, + end_time=end_time, + sample_ms=sample_ms, + page_size=page_size, + queries=batch, + ) + task = asyncio.create_task(self._get_data(req)) + tasks.append(task) + + data_pages: List[Iterable[Any]] = [] + + for pages in await asyncio.gather(*tasks): + # Empty pages will have no effect + data_pages.extend(pages) + + return DataQueryResult(self._merge_and_sort_pages(data_pages)) + + async def _get_data(self, req: GetDataRequest) -> List[Iterable[Any]]: + pages: List[Iterable[Any]] = [] + + start_time = req.start_time + end_time = req.end_time + sample_ms = req.sample_ms + page_size = req.page_size + queries = req.queries + next_page_token = "" + + while True: + next_page_req = GetDataRequest( + start_time=start_time, + end_time=end_time, + sample_ms=sample_ms, + page_size=page_size, + queries=queries, + page_token=next_page_token, + ) + response = cast(GetDataResponse, await self._data_service_stub.GetData(next_page_req)) + + pages.append(response.data) + next_page_token = response.next_page_token + + if len(next_page_token) == 0: + break + + return pages + + def _merge_and_sort_pages( + self, pages: List[Iterable[Any]] + ) -> Dict[str, List[ChannelTimeSeries]]: + if len(pages) == 0: + return {} + + merged_values_by_channel: Dict[str, List[ChannelTimeSeries]] = {} + + for page in pages: + for raw_channel_values in page: + parsed_channel_data = try_deserialize_channel_data(cast(Any, raw_channel_values)) + + for metadata, cvalues in parsed_channel_data: + channel = metadata.channel + fqn = channel_fqn(channel.name, channel.component) + + if not fqn: + fqn = channel.channel_id + + time_series = merged_values_by_channel.get(fqn) + + if time_series is None: + merged_values_by_channel[fqn] = [ + ChannelTimeSeries( + data_type=cvalues.data_type, + time_column=cvalues.time_column, + value_column=cvalues.value_column, + ), + ] + else: + for series in time_series: + if series.data_type == cvalues.data_type: + series.time_column.extend(cvalues.time_column) + series.value_column.extend(cvalues.value_column) + break + else: # for-else + # Situation in which multiple channels with identical fully-qualified names but different types. + time_series.append( + ChannelTimeSeries( + data_type=cvalues.data_type, + time_column=cvalues.time_column, + value_column=cvalues.value_column, + ) + ) + + for data in merged_values_by_channel.values(): + for channel_data in data: + channel_data.sort_time_series() + + return merged_values_by_channel + + def _bust_cache(self): + self._cached_assets.clear() + self._cached_channels.clear() + self._cached_runs.clear() + + async def _load_asset(self, asset_name: str) -> Asset: + asset = self._cached_assets.get(asset_name) + + if asset is None: + asset = await self._get_asset_by_name(asset_name) + self._cached_assets[asset.name] = asset + + return asset + + async def _load_channels( + self, asset: Asset, channel_fqns: List[str] + ) -> Dict[ChannelFqn, List[Channel]]: + if self._cached_channels.get(asset.name) is None: + sift_channels = await self._get_channels_by_asset_id_and_channel_fqns( + asset.asset_id, channel_fqns + ) + + channels = defaultdict(list) + + for c in sift_channels: + channels[channel_fqn(c.name, c.component)].append(c) + + self._cached_channels[asset.name] = channels + return self._cached_channels[asset.name] + + chans = self._cached_channels[asset.name] + + channels_to_retrieve = [] + + for fqn in channel_fqns: + if chans.get(fqn) is None: + channels_to_retrieve.append(fqn) + + sift_channels = await self._get_channels_by_asset_id_and_channel_fqns( + asset.asset_id, channels_to_retrieve + ) + + channels = defaultdict(list) + + for c in sift_channels: + channels[channel_fqn(c.name, c.component)].append(c) + + self._cached_channels[asset.name].update(channels) + + return self._cached_channels[asset.name] + + async def _load_runs( + self, channel_queries: List[Union[ChannelQuery, CalculatedChannelQuery]] + ) -> Dict[RunName, Run]: + run_names: Set[str] = set() + + for channel_query in channel_queries: + run_name = channel_query.run_name + + if run_name is not None and len(run_name) > 0: + run_names.add(run_name) + + runs = {} + run_names_to_fetch = set() + + for run_name in run_names: + run = self._cached_runs.get(run_name) + + if run is not None: + runs[run.name] = run + else: + run_names_to_fetch.add(run_name) + + for run in await self._get_runs_by_names(run_names_to_fetch): + self._cached_runs[run.name] = run + runs[run.name] = run + + return runs + + async def _get_asset_by_name(self, asset_name: str) -> Asset: + req = ListAssetsRequest( + filter=f'name=="{asset_name}"', + page_size=1, + ) + res = cast(ListAssetsResponse, await self._asset_service_stub.ListAssets(req)) + assets = res.assets + + if len(assets) == 0: + raise DataError(f"Asset of name '{asset_name}' does not exist.") + + return res.assets[0] + + async def _get_runs_by_names(self, run_names: Set[str]) -> List[Run]: + if len(run_names) == 0: + return [] + + runs: List[Run] = [] + + filter = cel_in("name", run_names) + page_size = 1_000 + next_page_token = "" + + while True: + req = ListRunsRequest( + filter=filter, + page_size=page_size, + page_token=next_page_token, + ) + res = cast(ListRunsResponse, await self._run_service_stub.ListRuns(req)) + runs.extend(res.runs) + + next_page_token = res.next_page_token + + if len(next_page_token) == 0: + break + + seen_sift_runs = set() + + for sift_run in runs: + seen_sift_runs.add(sift_run.name) + + for run_name in run_names: + if run_name not in seen_sift_runs: + raise DataError(f"Run of name '{run_name}' does not exist.") + + return runs + + async def _get_channels_by_asset_id_and_channel_fqns( + self, asset_id: str, channel_fqns: List[str] + ) -> List[Channel]: + if len(asset_id) == 0: + return [] + + channels: List[Channel] = [] + + channel_names = [] + + for fqn in channel_fqns: + channel_names.append(fqn.split(".")[-1]) + + name_in = cel_in("name", channel_names) + + filter = f'asset_id=="{asset_id}" && {name_in}' + page_size = 1_000 + next_page_token = "" + + while True: + req = ListChannelsRequest( + filter=filter, + page_size=page_size, + page_token=next_page_token, + ) + res = cast(ListChannelsResponse, await self._channel_service_stub.ListChannels(req)) + channels.extend(res.channels) + next_page_token = res.next_page_token + + if len(next_page_token) == 0: + break + + return channels + + def _batch_queries(self, queries: List[Query]) -> List[List[Query]]: + if len(queries) == 0: + return [] + + batches: List[List[Query]] = [] + batch_size = self.__class__.REQUEST_BATCH_SIZE + + for i in range(0, len(queries), batch_size): + batches.append(queries[i : i + batch_size]) + + return batches + + async def _validate_queries(self, queries: List[Query]): + queries_to_validate: List[ExpressionRequest] = [] + + for query in queries: + if query.HasField("calculated_channel"): + queries_to_validate.append(query.calculated_channel.expression) + + if len(queries_to_validate) > 0: + tasks = [] + + for to_validate in queries_to_validate: + task = asyncio.create_task(self._validate_expression(to_validate)) + tasks.append(task) + + for result in await asyncio.gather(*tasks): + if result is not None: + expr, err = result + raise ValueError(f"Encountered an invalid expression '{expr}': {err}") + + async def _validate_expression(self, req: ExpressionRequest) -> Optional[Tuple[str, Exception]]: + try: + self._calculated_channel_service_stub.ValidateExpression(req) + return None + except Exception as err: + return (req.expression, err) diff --git a/python/lib/sift_py/data/service_test.py b/python/lib/sift_py/data/service_test.py new file mode 100644 index 00000000..f024ea8a --- /dev/null +++ b/python/lib/sift_py/data/service_test.py @@ -0,0 +1,224 @@ +from contextlib import contextmanager +from datetime import datetime, timedelta, timezone +from typing import Dict, Iterator + +import pytest +from google.protobuf.any_pb2 import Any +from pytest_mock import MockFixture, MockType +from sift.assets.v1.assets_pb2 import Asset +from sift.channels.v2.channels_pb2 import Channel +from sift.common.type.v1.channel_bit_field_element_pb2 import ChannelBitFieldElement +from sift.common.type.v1.channel_data_type_pb2 import ( + CHANNEL_DATA_TYPE_BIT_FIELD, + CHANNEL_DATA_TYPE_DOUBLE, +) +from sift.data.v1.data_pb2 import ( + BitFieldElementValues, + BitFieldValue, + BitFieldValues, + DoubleValue, + DoubleValues, + Metadata, +) +from sift.runs.v2.runs_pb2 import Run + +from sift_py._internal.test_util.channel import MockAsyncChannel +from sift_py._internal.time import to_timestamp_pb +from sift_py.data.query import ChannelQuery, DataQuery +from sift_py.data.service import DataService + + +@pytest.mark.asyncio +async def test_data_service_execute_regular_channels(mocker: MockFixture): + with patch_grpc_calls_channels(mocker) as mocks: + channel = MockAsyncChannel() + data_service = DataService(channel) + + start_time = datetime.now(timezone.utc) + end_time = start_time + timedelta(minutes=2) + + query = DataQuery( + asset_name="NostromoLV428", + start_time=start_time, + end_time=end_time, + sample_ms=0, + channels=[ + ChannelQuery( + channel_name="velocity", + component="mainmotor", + run_name="[NostromoLV426].1720141748.047512", + ), + ChannelQuery( + channel_name="gpio", + run_name="[NostromoLV426].1720141748.047512", + ), + ], + ) + + result = await data_service.execute(query) + + mock_get_asset = mocks["mock_get_asset_by_name"] + mock_get_channels = mocks["mock_get_channels_by_asset_id_and_channel_fqns"] + mock_get_runs = mocks["mock_get_runs_by_names"] + + mock_get_asset.assert_called_once() + mock_get_channels.assert_called_once() + mock_get_runs.assert_called_once() + + # bit field elements count as separate channels + assert len(result.all_channels()) == 3 + assert not result.channel("velocity") + assert not result.channels("velocity") + assert len(result.channels("mainmotor.velocity")) == 1 + + velocity = result.channel("mainmotor.velocity") + assert velocity is not None + assert len(velocity.timestamps) == 2 + assert len(velocity.time_column()["time"]) == 2 + assert len(velocity.time_column("custom_column_name")["custom_column_name"]) == 2 + assert len(velocity.value_column()["mainmotor.velocity"]) == 2 + assert len(velocity.value_column("custom_column_name")["custom_column_name"]) == 2 + + all_columns = velocity.columns() + assert len(all_columns) == 2 + assert len(all_columns["time"]) == 2 + assert len(all_columns["mainmotor.velocity"]) == 2 + + all_columns_custom = velocity.columns( + time_column_name="ts", + value_column_name="velocity", + ) + assert len(all_columns_custom) == 2 + assert len(all_columns_custom["ts"]) == 2 + assert len(all_columns_custom["velocity"]) == 2 + + gpio = result.channel("gpio") + assert not gpio + + gpio_12v = result.channel("gpio.12v") + assert gpio_12v is not None + assert len(gpio_12v.timestamps) == 1 + assert len(gpio_12v.time_column()["time"]) == 1 + assert len(gpio_12v.time_column("custom_column_name")["custom_column_name"]) == 1 + assert len(gpio_12v.value_column()["gpio.12v"]) == 1 + assert len(gpio_12v.value_column("12v")["12v"]) == 1 + + gpio_heater = result.channel("gpio.heater") + assert gpio_heater is not None + assert len(gpio_heater.timestamps) == 1 + assert len(gpio_heater.time_column()["time"]) == 1 + assert len(gpio_heater.time_column("custom_column_name")["custom_column_name"]) == 1 + assert len(gpio_heater.value_column()["gpio.heater"]) == 1 + assert len(gpio_heater.value_column("heater")["heater"]) == 1 + + +@contextmanager +def patch_grpc_calls_channels(mocker: MockFixture) -> Iterator[Dict[str, MockType]]: + mock__get_asset_by_name = mocker.patch.object(DataService, "_get_asset_by_name") + mock__get_asset_by_name.return_value = Asset( + asset_id="b7955799-9893-4acf-bf14-50052284020c", name="NostromoLV428" + ) + + mock__get_channels_by_asset_id_and_channel_fqns = mocker.patch.object( + DataService, "_get_channels_by_asset_id_and_channel_fqns" + ) + mock__get_channels_by_asset_id_and_channel_fqns.return_value = [ + Channel( + channel_id="e8662647-12f7-465f-85dc-cb02513944e0", + name="velocity", + component="mainmotor", + data_type=CHANNEL_DATA_TYPE_DOUBLE, + ), + Channel( + channel_id="97e25141-ed3e-4538-b063-c3eac30838ce", + name="gpio", + data_type=CHANNEL_DATA_TYPE_BIT_FIELD, + ), + ] + + mock__get_runs_by_names = mocker.patch.object(DataService, "_get_runs_by_names") + mock__get_runs_by_names.return_value = [ + Run( + run_id="9b7f6c5f-cabc-4481-b048-6f12fc6b5b68", + name="[NostromoLV426].1720141748.047512", + ) + ] + + time_a = "2024-07-04T18:09:08.555-07:00" + time_b = "2024-07-04T18:09:09.555-07:00" + + double_values = DoubleValues( + metadata=Metadata( + data_type=CHANNEL_DATA_TYPE_DOUBLE, + channel=Metadata.Channel(name="velocity", component="mainmotor"), + ), + values=[ + DoubleValue( + timestamp=to_timestamp_pb(time_a), + value=10, + ), + DoubleValue( + timestamp=to_timestamp_pb(time_b), + value=11, + ), + ], + ) + + raw_double_values = Any() + raw_double_values.Pack(double_values) + + bit_field_values = BitFieldValues( + metadata=Metadata( + data_type=CHANNEL_DATA_TYPE_BIT_FIELD, + channel=Metadata.Channel( + name="gpio", + bit_field_elements=[ + ChannelBitFieldElement( + name="12v", + index=0, + bit_count=4, + ), + ChannelBitFieldElement( + name="heater", + index=4, + bit_count=4, + ), + ], + ), + ), + values=[ + BitFieldElementValues( + name="12v", + values=[ + BitFieldValue( + timestamp=to_timestamp_pb(time_a), + value=int("10000001", 2), + ) + ], + ), + BitFieldElementValues( + name="heater", + values=[ + BitFieldValue( + timestamp=to_timestamp_pb(time_a), + value=int("11110001", 2), + ) + ], + ), + ], + ) + + raw_bit_field_values = Any() + raw_bit_field_values.Pack(bit_field_values) + + mock__get_data = mocker.patch.object(DataService, "_get_data") + mock__get_data.side_effect = [ + [[raw_double_values]], + [[raw_bit_field_values]], + ] + yield { + "mock_get_asset_by_name": mock__get_asset_by_name, + "mock_get_runs_by_names": mock__get_runs_by_names, + "mock_get_channels_by_asset_id_and_channel_fqns": mock__get_channels_by_asset_id_and_channel_fqns, + "mock_get_data": mock__get_data, + } diff --git a/python/lib/sift_py/error.py b/python/lib/sift_py/error.py new file mode 100644 index 00000000..79646030 --- /dev/null +++ b/python/lib/sift_py/error.py @@ -0,0 +1,11 @@ +class SiftError(Exception): + """ + These exceptions are raised when something totally unexpected occurs and is + meant to indicate that the error is likely not caused by the user, but rather, + the library itself. These errors should be reported to Sift. + """ + + msg: str + + def __init__(self, msg: str): + super().__init__(f"{msg}\nPlease notify Sift.") diff --git a/python/lib/sift_py/grpc/_async_interceptors/__init__.py b/python/lib/sift_py/grpc/_async_interceptors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/lib/sift_py/grpc/_async_interceptors/base.py b/python/lib/sift_py/grpc/_async_interceptors/base.py new file mode 100644 index 00000000..3ce05299 --- /dev/null +++ b/python/lib/sift_py/grpc/_async_interceptors/base.py @@ -0,0 +1,72 @@ +from abc import abstractmethod +from typing import Any, AsyncIterable, Callable, Iterable, TypeVar, Union + +from grpc import aio as grpc_aio + +CallType = TypeVar("CallType", bound=grpc_aio.Call) +Continuation = Callable[[grpc_aio.ClientCallDetails, Any], CallType] + + +class ClientAsyncInterceptor( + grpc_aio.UnaryUnaryClientInterceptor, + grpc_aio.UnaryStreamClientInterceptor, + grpc_aio.StreamUnaryClientInterceptor, + grpc_aio.StreamStreamClientInterceptor, +): + @abstractmethod + async def intercept( + self, + method: Callable, + request_or_iterator: Any, + client_call_details: grpc_aio.ClientCallDetails, + ) -> Any: + pass + + async def intercept_unary_unary( + self, + continuation: Continuation[grpc_aio.UnaryUnaryCall], + client_call_details: grpc_aio.ClientCallDetails, + request: Any, + ): + return await self.intercept(_async_swap_args(continuation), request, client_call_details) + + async def intercept_unary_stream( + self, + continuation: Continuation[grpc_aio.UnaryStreamCall], + client_call_details: grpc_aio.ClientCallDetails, + request: Any, + ): + return await self.intercept(_async_swap_args(continuation), request, client_call_details) + + async def intercept_stream_unary( + self, + continuation: Continuation[grpc_aio.StreamUnaryCall], + client_call_details: grpc_aio.ClientCallDetails, + request_iterator: Union[Iterable[Any], AsyncIterable[Any]], + ): + return await self.intercept( + _async_swap_args(continuation), request_iterator, client_call_details + ) + + async def intercept_stream_stream( + self, + continuation: Continuation[grpc_aio.StreamStreamCall], + client_call_details: grpc_aio.ClientCallDetails, + request_iterator: Union[Iterable[Any], AsyncIterable[Any]], + ): + return await self.intercept( + _async_swap_args(continuation), request_iterator, client_call_details + ) + + +def _async_swap_args(fn: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]: + """ + Continuations are typed in such a way that details are the first argument, and the request second. + Code generated from protobuf however takes in the request first, then the details. Weird grpc library + quirk. This utility just flips the arguments. + """ + + async def new_fn(x, y): + return await fn(y, x) + + return new_fn diff --git a/python/lib/sift_py/grpc/_async_interceptors/metadata.py b/python/lib/sift_py/grpc/_async_interceptors/metadata.py new file mode 100644 index 00000000..0592c364 --- /dev/null +++ b/python/lib/sift_py/grpc/_async_interceptors/metadata.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from typing import Any, Callable, List, Tuple, cast + +from grpc import aio as grpc_aio + +from sift_py.grpc._async_interceptors.base import ClientAsyncInterceptor + +Metadata = List[Tuple[str, str]] + + +class MetadataAsyncInterceptor(ClientAsyncInterceptor): + metadata: Metadata + + """ + Interceptor to add metadata to all async unary and streaming RPCs + """ + + def __init__(self, metadata: Metadata): + self.metadata = metadata + + async def intercept( + self, + method: Callable, + request_or_iterator: Any, + client_call_details: grpc_aio.ClientCallDetails, + ): + call_details = cast(grpc_aio.ClientCallDetails, client_call_details) + new_details = grpc_aio.ClientCallDetails( + call_details.method, + call_details.timeout, + self.metadata, + call_details.credentials, + call_details.wait_for_ready, + ) + return await method(request_or_iterator, new_details) diff --git a/python/lib/sift_py/grpc/transport.py b/python/lib/sift_py/grpc/transport.py index 02293c15..87a28f5e 100644 --- a/python/lib/sift_py/grpc/transport.py +++ b/python/lib/sift_py/grpc/transport.py @@ -9,13 +9,17 @@ from typing import Any, List, Tuple, TypedDict import grpc +import grpc.aio as grpc_aio from grpc_interceptor import ClientInterceptor from typing_extensions import NotRequired, TypeAlias +from sift_py.grpc._async_interceptors.base import ClientAsyncInterceptor +from sift_py.grpc._async_interceptors.metadata import MetadataAsyncInterceptor from sift_py.grpc._interceptors import Metadata, MetadataInterceptor from sift_py.grpc._retry import RetryPolicy SiftChannel: TypeAlias = grpc.Channel +SiftAsyncChannel: TypeAlias = grpc_aio.Channel def use_sift_channel(config: SiftChannelConfig) -> SiftChannel: @@ -38,6 +42,22 @@ def use_sift_channel(config: SiftChannelConfig) -> SiftChannel: return grpc.intercept_channel(channel, *interceptors) +def use_sift_async_channel(config: SiftChannelConfig) -> SiftAsyncChannel: + """ + Like `use_sift_channel` but returns a channel meant to be used within the context + of an async runtime when asynchonous I/O is required. + """ + if not config.get("use_ssl", True): + return _use_insecure_sift_async_channel(config) + + return grpc_aio.secure_channel( + target=config["uri"], + credentials=grpc.ssl_channel_credentials(), + options=_compute_channel_options(), + interceptors=_compute_sift_async_interceptors(config), + ) + + def _use_insecure_sift_channel(config: SiftChannelConfig) -> SiftChannel: """ FOR DEVELOPMENT PURPOSES ONLY @@ -48,6 +68,17 @@ def _use_insecure_sift_channel(config: SiftChannelConfig) -> SiftChannel: return grpc.intercept_channel(channel, *interceptors) +def _use_insecure_sift_async_channel(config: SiftChannelConfig) -> SiftAsyncChannel: + """ + FOR DEVELOPMENT PURPOSES ONLY + """ + return grpc_aio.insecure_channel( + target=config["uri"], + options=_compute_channel_options(), + interceptors=_compute_sift_async_interceptors(config), + ) + + def _compute_sift_interceptors(config: SiftChannelConfig) -> List[ClientInterceptor]: """ Initialized all interceptors here. @@ -57,6 +88,12 @@ def _compute_sift_interceptors(config: SiftChannelConfig) -> List[ClientIntercep ] +def _compute_sift_async_interceptors(config: SiftChannelConfig) -> List[grpc_aio.ClientInterceptor]: + return [ + _metadata_async_interceptor(config), + ] + + def _compute_channel_options() -> List[Tuple[str, Any]]: """ Initialize all [channel options](https://github.com/grpc/grpc/blob/v1.64.x/include/grpc/impl/channel_arg_names.h) here. @@ -75,6 +112,17 @@ def _metadata_interceptor(config: SiftChannelConfig) -> ClientInterceptor: return MetadataInterceptor(metadata) +def _metadata_async_interceptor(config: SiftChannelConfig) -> ClientAsyncInterceptor: + """ + Any new metadata goes here for unary-unary calls. + """ + apikey = config["apikey"] + metadata: Metadata = [ + ("authorization", f"Bearer {apikey}"), + ] + return MetadataAsyncInterceptor(metadata) + + class SiftChannelConfig(TypedDict): """ Config class used to instantiate a `SiftChannel` via `use_sift_channel`. diff --git a/python/lib/sift_py/ingestion/_service_test.py b/python/lib/sift_py/ingestion/_service_test.py index c042383b..34b110ec 100644 --- a/python/lib/sift_py/ingestion/_service_test.py +++ b/python/lib/sift_py/ingestion/_service_test.py @@ -164,6 +164,26 @@ def mock_ctx_manager(): assert mock_ingest.call_count == 6 + with mock_ctx_manager(): + with ingestion_service.buffered_ingestion() as buffered_ingestion: + for _ in range(6_600): + buffered_ingestion.ingest_flows( + { + "flow_name": "readings", + "timestamp": datetime.now(timezone.utc), + "channel_values": [double_value(random.random())], + } + ) + + assert mock_ingest.call_count == 6 + assert len(buffered_ingestion._buffer) == 600 + + with pytest.raises(Exception): + raise + + assert len(buffered_ingestion._buffer) == 0 + assert mock_ingest.call_count == 7 + def test_ingestion_service_modify_existing_channel_configs(mocker: MockFixture): """ diff --git a/python/lib/sift_py/ingestion/channel.py b/python/lib/sift_py/ingestion/channel.py index 6a38be51..7ff0bf7d 100644 --- a/python/lib/sift_py/ingestion/channel.py +++ b/python/lib/sift_py/ingestion/channel.py @@ -16,6 +16,7 @@ from sift.ingestion_configs.v1.ingestion_configs_pb2 import ChannelConfig as ChannelConfigPb from typing_extensions import NotRequired, Self +from sift_py._internal.channel import channel_fqn as _channel_fqn from sift_py._internal.convert.protobuf import AsProtobuf @@ -205,6 +206,30 @@ def from_str(cls, val: str) -> Optional["ChannelDataType"]: return None + def as_human_str(self) -> str: + if self == self.__class__.DOUBLE.value: + return ChannelDataTypeStrRep.DOUBLE.value + elif self == self.__class__.STRING.value: + return ChannelDataTypeStrRep.STRING.value + elif self == self.__class__.ENUM.value: + return ChannelDataTypeStrRep.ENUM.value + elif self == self.__class__.BIT_FIELD.value: + return ChannelDataTypeStrRep.BIT_FIELD.value + elif self == self.__class__.BOOL.value: + return ChannelDataTypeStrRep.BOOL.value + elif self == self.__class__.FLOAT.value: + return ChannelDataTypeStrRep.FLOAT.value + elif self == self.__class__.INT_32.value: + return ChannelDataTypeStrRep.INT_32.value + elif self == self.__class__.INT_64.value: + return ChannelDataTypeStrRep.INT_64.value + elif self == self.__class__.UINT_32.value: + return ChannelDataTypeStrRep.UINT_32.value + elif self == self.__class__.UINT_64.value: + return ChannelDataTypeStrRep.UINT_64.value + else: + raise Exception("Unreachable.") + class ChannelDataTypeStrRep(Enum): DOUBLE = "double" @@ -242,13 +267,6 @@ def channel_fqn(channel: Union[ChannelConfig, ChannelConfigPb, ChannelValue, Cha return f"{component}.{channel_name}" -def _channel_fqn(name: str, component: Optional[str]) -> str: - if component is None or len(component) == 0: - return name - else: - return f"{component}.{name}" - - def string_value(val: str) -> IngestWithConfigDataChannelValue: return IngestWithConfigDataChannelValue(string=val) diff --git a/python/pyproject.toml b/python/pyproject.toml index f8b2e66c..395ed068 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -20,17 +20,20 @@ keywords = [ "sift_py", ] dependencies = [ + "aiofiles==24.1.0", "typing-extensions==4.12.2", # https://grpc.github.io/grpc/python/ "grpcio==1.64.1", # https://googleapis.dev/python/protobuf/latest/ "types-protobuf==5.26.0.20240422", - "protobuf==5.27.1", + "protobuf==5.27.2", # https://grpc-interceptor.readthedocs.io "grpc-interceptor==0.15.4", # https://pyyaml.org/wiki/PyYAMLDocumentation "PyYAML==6.0.1", "types-PyYAML==6.0.12.20240311", + "pandas==2.2.2", + "pandas-stubs==2.2.2.240603", ] [project.urls] @@ -41,6 +44,7 @@ Changelog = "https://github.com/sift-stack/sift/tree/main/python/CHANGELOG.md" [project.optional-dependencies] development = [ + # static analysis "mypy==1.10.0", # testing tools @@ -48,6 +52,7 @@ development = [ "pytest-benchmark==4.0.0", "pytest-mock==3.14.0", "grpcio-testing==1.64.1", + "pytest-asyncio==0.23.7", # formatter + linter "ruff",