From c1301c8eb59a821a691178c9dfdfb65ab25f3981 Mon Sep 17 00:00:00 2001 From: solidiquis Date: Fri, 12 Jul 2024 14:03:37 -0700 Subject: [PATCH] documentation --- python/lib/sift_py/__init__.py | 63 +++++++ python/lib/sift_py/data/__init__.py | 171 ++++++++++++++++++ .../sift_py/data/{channel.py => _channel.py} | 0 .../data/{deserialize.py => _deserialize.py} | 2 +- ...serialize_test.py => _deserialize_test.py} | 2 +- python/lib/sift_py/data/query.py | 84 +++++++-- python/lib/sift_py/data/service.py | 14 +- 7 files changed, 320 insertions(+), 16 deletions(-) rename python/lib/sift_py/data/{channel.py => _channel.py} (100%) rename python/lib/sift_py/data/{deserialize.py => _deserialize.py} (99%) rename python/lib/sift_py/data/{deserialize_test.py => _deserialize_test.py} (98%) diff --git a/python/lib/sift_py/__init__.py b/python/lib/sift_py/__init__.py index bd888694..ab1d1526 100644 --- a/python/lib/sift_py/__init__.py +++ b/python/lib/sift_py/__init__.py @@ -16,6 +16,7 @@ - [Sending data to Sift](#sending-data-to-sift) * [Ingestion Performance](#ingestion-performance) - [Buffered Ingestion](#buffered-ingestion) +* [Downloading Telemetry](#downloading-telemetry) * [More Examples](#more-examples) ## Introduction @@ -846,6 +847,68 @@ def nostromos_lv_426() -> TelemetryConfig: Visit the `sift_py.ingestion.service.IngestionService.buffered_ingestion` function definition for further details. +## Downloading Telemetry + +To download your telemetry locally you'll want to make use of the `sift_py.data` module. Them module-level documentation +contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them +into a `pandas` data frame, and writing the results out to a CSV: + +```python +import asyncio +import functools +import pandas as pd +from sift_py.data.query import ChannelQuery, DataQuery +from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel +from sift_py.data.service import DataService + + +async def channel_demo(): + channel_config: SiftChannelConfig = { + "apikey": "my-key" + "uri": "sift-uri" + } + + async with use_sift_async_channel(channel_config) as channel: + data_service = DataService(channel) + + query = DataQuery( + asset_name="NostromoLV426", + start_time="2024-07-04T18:09:08.555-07:00", + end_time="2024-07-04T18:09:11.556-07:00", + channels=[ + ChannelQuery( + channel_name="voltage", + run_name="[NostromoLV426].1720141748.047512" + ), + ChannelQuery( + channel_name="velocity", + component="mainmotors", + run_name="[NostromoLV426].1720141748.047512", + ), + ChannelQuery( + channel_name="gpio", + run_name="[NostromoLV426].1720141748.047512", + ), + ], + ) + + result = await data_service.execute(query) + + data_frames = [ + pd.DataFrame(data.columns()) + for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v") + ] + + merged_frame = functools.reduce( + lambda x, y: pd.merge_asof(x, y, on="time"), data_frames + ) + + merged_frame.to_csv("my_csv.csv") + +if __name__ == "__main__": + asyncio.run(example()) +``` + ## More Examples For more comphrensive examples demonstrating a little bit of everything, you may diff --git a/python/lib/sift_py/data/__init__.py b/python/lib/sift_py/data/__init__.py index e69de29b..c086c447 100644 --- a/python/lib/sift_py/data/__init__.py +++ b/python/lib/sift_py/data/__init__.py @@ -0,0 +1,171 @@ +""" +This module contains tools to download telemetry from the Sift data API. The +core component of this module is the `sift_py.data.service.DataService` and the +`sift_py.data.query` module. The former is what's used to execute a data query, +while the latter is what's used to actually construct the query. A typical query could look +something like this: + +```python +query = DataQuery( + asset_name="NostromoLV426", + start_time="2024-07-04T18:09:08.555-07:00", + end_time="2024-07-04T18:09:11.556-07:00", + sample_ms=16, + channels=[ + ChannelQuery( + channel_name="voltage", + run_name="[NostromoLV426].1720141748.047512" + ), + ChannelQuery( + channel_name="velocity", + component="mainmotors", + run_name="[NostromoLV426].1720141748.047512", + ), + ChannelQuery( + channel_name="gpio", + run_name="[NostromoLV426].1720141748.047512", + ), + ], +) +``` + +This query, once passed to the `sift_py.data.service.DataService.execute` method, will +fetch data between `start_time` and `end_time` at the sampling rate given by `sample_ms`. + +> ⚠️ **Warning**: Note on Performance +> +> Currently the results of a query are all buffered in memory, so it it best to be mindful +> about your memory limitations and overall performance requirements when requesting data +> within a large time range and a slow sampling rate. Full-fidelity data is returned +> when the `sample_ms` is set to `0`. + +The data API allows you to download telemetry for both channels as well as calculated +channels. The following examples demonstrate how to download data for both channels and +calculated channels, respectively. + +* [Regular Channels](#regular-channels) +* [Calculated Channels](#calculated-channels) + +## Regular Channels + +```python +import asyncio +import functools +import pandas as pd +from sift_py.data.query import ChannelQuery, DataQuery +from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel +from sift_py.data.service import DataService + + +async def channel_demo(): + channel_config: SiftChannelConfig = { + "apikey": "my-key" + "uri": "sift-uri" + } + + async with use_sift_async_channel(channel_config) as channel: + data_service = DataService(channel) + + query = DataQuery( + asset_name="NostromoLV426", + start_time="2024-07-04T18:09:08.555-07:00", + end_time="2024-07-04T18:09:11.556-07:00", + channels=[ + ChannelQuery( + channel_name="voltage", + run_name="[NostromoLV426].1720141748.047512" + ), + ChannelQuery( + channel_name="velocity", + component="mainmotors", + run_name="[NostromoLV426].1720141748.047512", + ), + ChannelQuery( + channel_name="gpio", + run_name="[NostromoLV426].1720141748.047512", + ), + ], + ) + + result = await data_service.execute(query) + + data_frames = [ + pd.DataFrame(data.columns()) + for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v") + ] + + merged_frame = functools.reduce( + lambda x, y: pd.merge_asof(x, y, on="time"), data_frames + ) + + merged_frame.to_csv("my_csv.csv") + +if __name__ == "__main__": + asyncio.run(example()) +``` + +## Calculated Channels + +```python +import asyncio +import functools +import pandas as pd +from sift_py.data.query import ChannelQuery, DataQuery +from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel +from sift_py.data.service import DataService + + +async def channel_demo(): + channel_config: SiftChannelConfig = { + "apikey": "my-key" + "uri": "sift-uri" + } + + async with use_sift_async_channel(channel_config) as channel: + data_service = DataService(channel) + + query = DataQuery( + asset_name="NostromoLV426", + start_time="2024-07-04T18:09:08.555-07:00", + end_time="2024-07-04T18:09:11.556-07:00", + channels=[ + CalculatedChannelQuery( + channel_key="calc-voltage", + expression="$1 + 10", + expression_channel_references=[ + { + "reference": "$1", + "channel_name": "voltage", + }, + ], + run_name="[NostromoLV426].1720141748.047512", + ), + CalculatedChannelQuery( + channel_key="calc-velocity", + expression="$1 * 2", + expression_channel_references=[ + { + "reference": "$1", + "channel_name": "velocity", + "component": "mainmotors", + }, + ], + run_name="[NostromoLV426].1720141748.047512", + ), + ], + ) + + result = await data_service.execute(query) + calc_voltage, calc_velocity = result.channels("calc-voltage", "calc-velocity") + + calc_voltage_df = pd.DataFrame(calc_voltage.columns()) + calc_velocity_df = pd.DataFrame(calc_velocity.columns()) + + merged_frame = pd.merge_asof(calc_voltage_df, calc_velocity_df, on="time") + + merged_frame.to_csv("my_csv.csv") + +if __name__ == "__main__": + asyncio.run(example()) +``` +""" diff --git a/python/lib/sift_py/data/channel.py b/python/lib/sift_py/data/_channel.py similarity index 100% rename from python/lib/sift_py/data/channel.py rename to python/lib/sift_py/data/_channel.py diff --git a/python/lib/sift_py/data/deserialize.py b/python/lib/sift_py/data/_deserialize.py similarity index 99% rename from python/lib/sift_py/data/deserialize.py rename to python/lib/sift_py/data/_deserialize.py index 12392591..709c9164 100644 --- a/python/lib/sift_py/data/deserialize.py +++ b/python/lib/sift_py/data/_deserialize.py @@ -18,7 +18,7 @@ ) from sift_py._internal.time import to_timestamp_nanos -from sift_py.data.channel import ChannelTimeSeries +from sift_py.data._channel import ChannelTimeSeries from sift_py.error import SiftError from sift_py.ingestion.channel import ChannelDataType diff --git a/python/lib/sift_py/data/deserialize_test.py b/python/lib/sift_py/data/_deserialize_test.py similarity index 98% rename from python/lib/sift_py/data/deserialize_test.py rename to python/lib/sift_py/data/_deserialize_test.py index c3b5ad94..78fcfc2e 100644 --- a/python/lib/sift_py/data/deserialize_test.py +++ b/python/lib/sift_py/data/_deserialize_test.py @@ -14,7 +14,7 @@ ) from sift_py._internal.time import to_timestamp_nanos, to_timestamp_pb -from sift_py.data.deserialize import try_deserialize_channel_data +from sift_py.data._deserialize import try_deserialize_channel_data def test_try_deserialize_channel_data_double(): diff --git a/python/lib/sift_py/data/query.py b/python/lib/sift_py/data/query.py index 705f87c5..2a77b095 100644 --- a/python/lib/sift_py/data/query.py +++ b/python/lib/sift_py/data/query.py @@ -1,3 +1,11 @@ +""" +Module containing utilities to construct a data query which is ultimately +passed to `sift_py.data.service.DataService.execute` to download telemetry. + +This module also contains types that represent the result of a data query +which can be easily converted into a `pandas` data frame or series. +""" + from __future__ import annotations from datetime import datetime @@ -9,11 +17,26 @@ from sift_py._internal.channel import channel_fqn from sift_py._internal.time import to_timestamp_nanos -from sift_py.data.channel import ChannelTimeSeries +from sift_py.data._channel import ChannelTimeSeries from sift_py.ingestion.channel import ChannelDataType class DataQuery: + """ + A query that is meant to be passed to `sift_py.data.service.DataService.execute` to + retrieve telemetry. + + - `asset_name`: The name of the asset to query telemetry for. + - `start_time`: The start time of the time range of the data to request. + - `end_time`: The end time of the time range of the data to request. + - `sample_ms`: + The sampling rate to use when retrieving data. The lower the sampling rate, the + greater the data-fidelity. A sampling rate of `0` retrieves full-fidelity data. + - `channels`: + List of either `ChannelQuery` or `CalculatedChannelQuery`, but not both. Represents the + channels to retrieve data from. + """ + DEFAULT_PAGE_SIZE = 100_000 asset_name: str @@ -30,14 +53,15 @@ def __init__( end_time: Union[pd.Timestamp, TimestampPb, datetime, str, int], channels: List[Union[ChannelQuery, CalculatedChannelQuery]], sample_ms: int = 0, - page_size: int = DEFAULT_PAGE_SIZE, + # Currently not in use outside of testing purposes. + _: int = DEFAULT_PAGE_SIZE, ): self.start_time = to_timestamp_nanos(start_time) self.end_time = to_timestamp_nanos(end_time) self.asset_name = asset_name self.sample_ms = sample_ms self.channels = channels - self.page_size = page_size + self.page_size = self.__class__.DEFAULT_PAGE_SIZE """ @@ -48,6 +72,10 @@ def __init__( class DataQueryResult: + """ + The result of a data query which can contain multiple channels. + """ + _result: Dict[str, List[ChannelTimeSeries]] def __init__(self, merged_channel_data: Dict[str, List[ChannelTimeSeries]]): @@ -131,6 +159,10 @@ def channels(self, *lookup: ChannelLookupInfo) -> List[DataQueryResultSet]: return result def all_channels(self) -> List[DataQueryResultSet]: + """ + Returns all channel data. + """ + result = [] for fqn, time_series in self._result.items(): @@ -161,6 +193,15 @@ def all_channels(self) -> List[DataQueryResultSet]: class DataQueryResultSet: + """ + Represents time series data for a single channel. Can easily be converted into a `pandas` data frame like so: + + ```python + pd.DataFrame(data_query_result_set.all_columns()) + ``` + + """ + identifier: str timestamps: List[pd.Timestamp] values: List[Any] @@ -171,12 +212,21 @@ def __init__(self, identifier: str, timestamps: List[pd.Timestamp], values: List self.values = values def value_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: + """ + Returns a single key-value pair dictionary meant to represent the value column of the data-set. + `column_name` can be used to override the name of the column. + """ + if column_name is None: return {self.identifier: self.values} else: return {column_name: self.values} def time_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: + """ + Returns a single key-value pair dictionary meant to represent the time column of the data-set. + `column_name` can be used to override the name of the column. + """ if column_name is None: return {"time": self.timestamps} else: @@ -187,12 +237,20 @@ def columns( time_column_name: Optional[str] = None, value_column_name: Optional[str] = None, ) -> Dict[str, List[Any]]: + """ + Returns both the time and value columns with options to override the column names. + """ + cols = self.time_column(time_column_name) cols.update(self.value_column(value_column_name)) return cols class ChannelQuery: + """ + Represents a single channel to include in the `sift_py.data.query.DataQuery`. + """ + channel_name: str component: Optional[str] run_name: Optional[str] @@ -211,16 +269,18 @@ def fqn(self) -> str: return channel_fqn(self.channel_name, self.component) +class ExpressionChannelReference(TypedDict): + reference: str + channel_name: str + component: NotRequired[str] + data_type: NotRequired[ChannelDataType] + + class CalculatedChannelQuery: - ExpressionChannelReference = TypedDict( - "ExpressionChannelReference", - { - "reference": str, - "channel_name": str, - "component": NotRequired[str], - "data_type": NotRequired[ChannelDataType], - }, - ) + """ + Represents a single calculated channel to include in the `sift_py.data.query.DataQuery`. + """ + channel_key: str expression: str expression_channel_references: List[ExpressionChannelReference] diff --git a/python/lib/sift_py/data/service.py b/python/lib/sift_py/data/service.py index ddec9b7b..cf939953 100644 --- a/python/lib/sift_py/data/service.py +++ b/python/lib/sift_py/data/service.py @@ -23,9 +23,9 @@ from sift_py._internal.cel import cel_in from sift_py._internal.channel import channel_fqn from sift_py._internal.convert.timestamp import to_pb_timestamp +from sift_py.data._channel import ChannelTimeSeries +from sift_py.data._deserialize import try_deserialize_channel_data from sift_py.data._validate import validate_channel_reference -from sift_py.data.channel import ChannelTimeSeries -from sift_py.data.deserialize import try_deserialize_channel_data from sift_py.data.error import DataError from sift_py.data.query import CalculatedChannelQuery, ChannelQuery, DataQuery, DataQueryResult from sift_py.error import SiftError @@ -34,6 +34,12 @@ class DataService: + """ + A service that asynchronously executes a `sift_py.data.query.DataQuery` to retrieve telemetry + for an arbitrary amount of channels (or calculated channels) within a user-specified time-range + and sampling rate. + """ + # TODO: There is a pagination issue API side when requesting multiple channels in single request. # If all data points for all channels in a single request don't fit into a single page, then # paging seems to omit all but a single channel. We can increase this batch size once that issue @@ -66,6 +72,10 @@ def __init__(self, channel: SiftAsyncChannel): self._cached_runs = {} async def execute(self, query: DataQuery, bust_cache: bool = False) -> DataQueryResult: + """ + Performs the actual query to retrieve telemetry. + """ + if bust_cache: self._bust_cache()