Skip to content

Commit

Permalink
python(feature): downloading telemetry (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis authored Jul 12, 2024
1 parent 25e2733 commit 79a8daf
Show file tree
Hide file tree
Showing 23 changed files with 2,008 additions and 8 deletions.
63 changes: 63 additions & 0 deletions python/lib/sift_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
- [Sending data to Sift](#sending-data-to-sift)
* [Ingestion Performance](#ingestion-performance)
- [Buffered Ingestion](#buffered-ingestion)
* [Downloading Telemetry](#downloading-telemetry)
* [More Examples](#more-examples)
## Introduction
Expand Down Expand Up @@ -846,6 +847,68 @@ def nostromos_lv_426() -> TelemetryConfig:
Visit the `sift_py.ingestion.service.IngestionService.buffered_ingestion` function definition
for further details.
## Downloading Telemetry
To download your telemetry locally you'll want to make use of the `sift_py.data` module. Them module-level documentation
contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them
into a `pandas` data frame, and writing the results out to a CSV:
```python
import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService
async def channel_demo():
channel_config: SiftChannelConfig = {
"apikey": "my-key"
"uri": "sift-uri"
}
async with use_sift_async_channel(channel_config) as channel:
data_service = DataService(channel)
query = DataQuery(
asset_name="NostromoLV426",
start_time="2024-07-04T18:09:08.555-07:00",
end_time="2024-07-04T18:09:11.556-07:00",
channels=[
ChannelQuery(
channel_name="voltage",
run_name="[NostromoLV426].1720141748.047512"
),
ChannelQuery(
channel_name="velocity",
component="mainmotors",
run_name="[NostromoLV426].1720141748.047512",
),
ChannelQuery(
channel_name="gpio",
run_name="[NostromoLV426].1720141748.047512",
),
],
)
result = await data_service.execute(query)
data_frames = [
pd.DataFrame(data.columns())
for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
]
merged_frame = functools.reduce(
lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
)
merged_frame.to_csv("my_csv.csv")
if __name__ == "__main__":
asyncio.run(example())
```
## More Examples
For more comphrensive examples demonstrating a little bit of everything, you may
Expand Down
18 changes: 18 additions & 0 deletions python/lib/sift_py/_internal/cel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
Utilities to interact with APIs that have a CEL-based interface.
"""

from typing import Iterable


def cel_in(field: str, values: Iterable[str]) -> str:
"""
Produces a list membership CEL expression. Example:
```python
> print(cel_in("name", ["foo", "bar"]))
name in ["foo", "bar"]
```
"""
items = ",".join([f'"{val}"' for val in values])
return f"{field} in [{items}]"
5 changes: 5 additions & 0 deletions python/lib/sift_py/_internal/channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from typing import Optional


def channel_fqn(name: str, component: Optional[str]) -> str:
return name if component is None or len(component) == 0 else f"{component}.{name}"
9 changes: 9 additions & 0 deletions python/lib/sift_py/_internal/convert/timestamp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from datetime import datetime

from google.protobuf.timestamp_pb2 import Timestamp


def to_pb_timestamp(timestamp: datetime) -> Timestamp:
timestamp_pb = Timestamp()
timestamp_pb.FromDatetime(timestamp)
return timestamp_pb
66 changes: 66 additions & 0 deletions python/lib/sift_py/_internal/test_util/channel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
from collections.abc import AsyncIterable, Callable, Iterable
from typing import Any, Optional, Union

import grpc
import grpc.aio as grpc_aio
from grpc.aio import Channel as AsyncChannel
from grpc_testing import Channel

SerializingFunction = Callable[[Any], bytes]
DeserializingFunction = Callable[[bytes], Any]
DoneCallbackType = Callable[[Any], None]
RequestIterableType = Union[Iterable[Any], AsyncIterable[Any]]
ResponseIterableType = AsyncIterable[Any]


class MockChannel(Channel):
"""
Expand Down Expand Up @@ -68,3 +80,57 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
pass


class MockAsyncChannel(AsyncChannel):
async def __aenter__(self):
pass

async def __aexit__(self, exc_type, exc_val, exc_tb):
pass

async def close(self, grace: Optional[float] = None):
pass

def get_state(self, try_to_connect: bool = False) -> grpc.ChannelConnectivity: ...

async def wait_for_state_change(
self,
last_observed_state: grpc.ChannelConnectivity,
) -> None:
return None

async def channel_ready(self) -> None:
return None

def unary_unary(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
_registered_method: Optional[bool] = False,
) -> grpc_aio.UnaryUnaryMultiCallable: ...

def unary_stream(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
_registered_method: Optional[bool] = False,
) -> grpc_aio.UnaryStreamMultiCallable: ...

def stream_unary(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
_registered_method: Optional[bool] = False,
) -> grpc_aio.StreamUnaryMultiCallable: ...

def stream_stream(
self,
method: str,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
_registered_method: Optional[bool] = False,
) -> grpc_aio.StreamStreamMultiCallable: ...
48 changes: 48 additions & 0 deletions python/lib/sift_py/_internal/time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from datetime import datetime, timezone
from typing import Union, cast

import pandas as pd
from google.protobuf.timestamp_pb2 import Timestamp as TimestampPb


def to_timestamp_nanos(arg: Union[TimestampPb, pd.Timestamp, datetime, str, int]) -> pd.Timestamp:
"""
Converts a variety of time-types to a pandas timestamp which supports nano-second precision.
"""

if isinstance(arg, pd.Timestamp):
return arg
elif isinstance(arg, TimestampPb):
seconds = arg.seconds
nanos = arg.nanos

dt = datetime.fromtimestamp(seconds, tz=timezone.utc)
ts = pd.Timestamp(dt)

return cast(pd.Timestamp, ts + pd.Timedelta(nanos, unit="ns"))

elif isinstance(arg, int):
dt = datetime.fromtimestamp(arg, tz=timezone.utc)
return cast(pd.Timestamp, pd.Timestamp(dt))

else:
return cast(pd.Timestamp, pd.Timestamp(arg))


def to_timestamp_pb(arg: Union[datetime, str, int]) -> TimestampPb:
"""
Mainly used for testing at the moment. If using this for non-testing purposes
should probably make this more robust and support nano-second precision.
"""

ts = TimestampPb()

if isinstance(arg, datetime):
ts.FromDatetime(arg)
return ts
elif isinstance(arg, int):
ts.FromDatetime(datetime.fromtimestamp(arg))
return ts
else:
ts.FromDatetime(datetime.fromisoformat(arg))
return ts
Loading

0 comments on commit 79a8daf

Please sign in to comment.