Skip to content

Commit

Permalink
calculated channels wip
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis committed Jul 11, 2024
1 parent 85c5a13 commit 1c5a138
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 59 deletions.
120 changes: 89 additions & 31 deletions python/lib/sift_py/data/deserialize.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from copy import deepcopy
from enum import Enum
from typing import Tuple, cast
from typing import List, Tuple, cast

from google.protobuf.any_pb2 import Any
from sift.data.v1.data_pb2 import (
BitFieldValues,
BoolValues,
DoubleValues,
EnumValues,
Expand Down Expand Up @@ -34,117 +36,173 @@ class ChannelValues(Enum):
UINT64_VALUES = "sift.data.v1.Uint64Values"


def try_deserialize_channel_data(channel_values: Any) -> Tuple[Metadata, ChannelTimeSeries]:
def try_deserialize_channel_data(channel_values: Any) -> List[Tuple[Metadata, ChannelTimeSeries]]:
if channel_values.type_url == ChannelValues.DOUBLE_VALUES.value:
double_values = cast(DoubleValues, DoubleValues.FromString(channel_values.value))
metadata = double_values.metadata

time_column = [to_datetime(v.timestamp) for v in double_values.values]
double_value_column = [v.value for v in double_values.values]
time_column = []
double_value_column = []

for v in double_values.values:
time_column.append(to_datetime(v.timestamp))
double_value_column.append(v.value)

time_series = ChannelTimeSeries(
ChannelDataType.from_pb(metadata.data_type), time_column, double_value_column
)
return metadata, time_series
return [(metadata, time_series)]

elif channel_values.type_url == ChannelValues.FLOAT_VALUES.value:
float_values = cast(FloatValues, FloatValues.FromString(channel_values.value))
metadata = float_values.metadata

time_column = [to_datetime(v.timestamp) for v in float_values.values]
float_value_column = [v.value for v in float_values.values]
time_column = []
float_value_column = []

for float_v in float_values.values:
time_column.append(to_datetime(float_v.timestamp))
float_value_column.append(float_v.value)

time_series = ChannelTimeSeries(
ChannelDataType.from_pb(metadata.data_type), time_column, float_value_column
)
return metadata, time_series
return [(metadata, time_series)]

elif channel_values.type_url == ChannelValues.STRING_VALUES.value:
string_values = cast(StringValues, StringValues.FromString(channel_values.value))
metadata = string_values.metadata

time_column = [to_datetime(v.timestamp) for v in string_values.values]
string_value_column = [v.value for v in string_values.values]
time_column = []
string_value_column = []

for string_v in string_values.values:
time_column.append(to_datetime(string_v.timestamp))
string_value_column.append(string_v.value)

time_series = ChannelTimeSeries(
ChannelDataType.from_pb(metadata.data_type), time_column, string_value_column
)
return metadata, time_series
return [(metadata, time_series)]

elif channel_values.type_url == ChannelValues.ENUM_VALUES.value:
enum_values = cast(EnumValues, EnumValues.FromString(channel_values.value))
metadata = enum_values.metadata

time_column = [to_datetime(v.timestamp) for v in enum_values.values]
enum_value_column = [v.value for v in enum_values.values]
time_column = []
enum_value_column = []

for enum_v in enum_values.values:
time_column.append(to_datetime(enum_v.timestamp))
enum_value_column.append(enum_v.value)

time_series = ChannelTimeSeries(
ChannelDataType.from_pb(metadata.data_type), time_column, enum_value_column
)
return metadata, time_series
return [(metadata, time_series)]

elif channel_values.type_url == ChannelValues.BOOL_VALUES.value:
bool_values = cast(BoolValues, BoolValues.FromString(channel_values.value))
metadata = bool_values.metadata

time_column = [to_datetime(v.timestamp) for v in bool_values.values]
bool_value_column = [v.value for v in bool_values.values]
time_column = []
bool_value_column = []

for bool_v in bool_values.values:
time_column.append(to_datetime(bool_v.timestamp))
bool_value_column.append(bool_v.value)

time_series = ChannelTimeSeries(
ChannelDataType.from_pb(metadata.data_type), time_column, bool_value_column
)
return metadata, time_series
return [(metadata, time_series)]

elif channel_values.type_url == ChannelValues.INT32_VALUES.value:
int32_values = cast(Int32Values, Int32Values.FromString(channel_values.value))
metadata = int32_values.metadata

time_column = [to_datetime(v.timestamp) for v in int32_values.values]
int32_value_column = [v.value for v in int32_values.values]
time_column = []
int32_value_column = []

for int32_v in int32_values.values:
time_column.append(to_datetime(int32_v.timestamp))
int32_value_column.append(int32_v.value)

time_series = ChannelTimeSeries(
ChannelDataType.from_pb(metadata.data_type), time_column, int32_value_column
)
return metadata, time_series
return [(metadata, time_series)]

elif channel_values.type_url == ChannelValues.INT64_VALUES.value:
int64_values = cast(Int64Values, Int64Values.FromString(channel_values.value))
metadata = int64_values.metadata

time_column = [to_datetime(v.timestamp) for v in int64_values.values]
int64_value_column = [v.value for v in int64_values.values]
time_column = []
int64_value_column = []

for int64_v in int64_values.values:
time_column.append(to_datetime(int64_v.timestamp))
int64_value_column.append(int64_v.value)

time_series = ChannelTimeSeries(
ChannelDataType.from_pb(metadata.data_type), time_column, int64_value_column
)
return metadata, time_series
return [(metadata, time_series)]

elif channel_values.type_url == ChannelValues.UINT32_VALUES.value:
uint32_values = cast(Uint32Values, Uint32Values.FromString(channel_values.value))
metadata = uint32_values.metadata

time_column = [to_datetime(v.timestamp) for v in uint32_values.values]
uint32_value_column = [v.value for v in uint32_values.values]
time_column = []
uint32_value_column = []

for uint32_v in uint32_values.values:
time_column.append(to_datetime(uint32_v.timestamp))
uint32_value_column.append(uint32_v.value)

time_series = ChannelTimeSeries(
ChannelDataType.from_pb(metadata.data_type), time_column, uint32_value_column
)
return metadata, time_series
return [(metadata, time_series)]

elif channel_values.type_url == ChannelValues.UINT64_VALUES.value:
uint64_values = cast(Uint64Values, Uint64Values.FromString(channel_values.value))
metadata = uint64_values.metadata

time_column = [to_datetime(v.timestamp) for v in uint64_values.values]
uint64_value_column = [v.value for v in uint64_values.values]
time_column = []
uint64_value_column = []

for uint64_v in uint64_values.values:
time_column.append(to_datetime(uint64_v.timestamp))
uint64_value_column.append(uint64_v.value)

time_series = ChannelTimeSeries(
ChannelDataType.from_pb(metadata.data_type), time_column, uint64_value_column
)
return metadata, time_series
return [(metadata, time_series)]

elif channel_values.type_url == ChannelValues.BIT_FIELD_VALUES.value:
# Handle deserialization for BIT_FIELD_VALUES
pass
bit_field_values = cast(BitFieldValues, BitFieldValues.FromString(channel_values.value))
metadata = bit_field_values.metadata
data_type = ChannelDataType.from_pb(metadata.data_type)
channel_name = metadata.channel.name

parsed_data: List[Tuple[Metadata, ChannelTimeSeries]] = []

for bit_field_element in bit_field_values.values:
md_copy = deepcopy(bit_field_values.metadata)
md_copy.channel.name = f"{channel_name}.{bit_field_element.name}"

time_column = []
bit_field_el_column = []

for bf_v in bit_field_element.values:
time_column.append(to_datetime(bf_v.timestamp))
bit_field_el_column.append(bf_v.value)

time_series = ChannelTimeSeries(data_type, time_column, bit_field_el_column)
parsed_data.append((md_copy, time_series))

return parsed_data

raise SiftError("Received an unknown channel-type.")
5 changes: 3 additions & 2 deletions python/lib/sift_py/data/query.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import uuid

from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union, cast

Expand Down Expand Up @@ -204,12 +206,11 @@ class CalculatedChannelQuery:

def __init__(
self,
channel_key: str,
run_name: str,
expression: str,
expression_channel_references: Dict[ChannelName, ChannelIdentifier],
):
self.channel_key = channel_key
self.channel_key = str(uuid.uuid4())
self.run_name = run_name
self.expression = expression
self.expression_channel_references = expression_channel_references
59 changes: 33 additions & 26 deletions python/lib/sift_py/data/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@


class DataService:
# TODO: There is a pagination when there are multiple channels in the same query that needs to be resolved.
# In the mean time each channel will be in their own requests.
# TODO: There is a pagination issue API side when requesting multiple channels in single request.
# If all data points for all channels in a single request don't fit into a single page, then
# paging seems to omit all but a single channel. We can increase this batch size once that issue
# has been resolved. In the mean time each channel gets its own request.
REQUEST_BATCH_SIZE = 1

AssetName: TypeAlias = str
Expand Down Expand Up @@ -152,39 +154,44 @@ async def _get_data(self, req: GetDataRequest) -> List[Iterable[Any]]:
def _merge_and_sort_pages(
self, pages: List[Iterable[Any]]
) -> Dict[str, List[ChannelTimeSeries]]:
if len(pages) == 0:
return {}

merged_values_by_channel: Dict[str, List[ChannelTimeSeries]] = {}

for page in pages:
for raw_channel_values in page:
metadata, cvalues = try_deserialize_channel_data(cast(Any, raw_channel_values))
channel = metadata.channel
fqn = channel_fqn(channel.name, channel.component)

time_series = merged_values_by_channel.get(fqn)

if time_series is None:
merged_values_by_channel[fqn] = [
ChannelTimeSeries(
data_type=cvalues.data_type,
time_column=cvalues.time_column,
value_column=cvalues.value_column,
),
]
else:
for series in time_series:
if series.data_type == cvalues.data_type:
series.time_column.extend(cvalues.time_column)
series.value_column.extend(cvalues.value_column)
break
else: # for-else
# Situation in which multiple channels with identical fully-qualified names but different types.
time_series.append(
parsed_channel_data = try_deserialize_channel_data(cast(Any, raw_channel_values))

for metadata, cvalues in parsed_channel_data:
channel = metadata.channel
fqn = channel_fqn(channel.name, channel.component)

time_series = merged_values_by_channel.get(fqn)

if time_series is None:
merged_values_by_channel[fqn] = [
ChannelTimeSeries(
data_type=cvalues.data_type,
time_column=cvalues.time_column,
value_column=cvalues.value_column,
),
]
else:
for series in time_series:
if series.data_type == cvalues.data_type:
series.time_column.extend(cvalues.time_column)
series.value_column.extend(cvalues.value_column)
break
else: # for-else
# Situation in which multiple channels with identical fully-qualified names but different types.
time_series.append(
ChannelTimeSeries(
data_type=cvalues.data_type,
time_column=cvalues.time_column,
value_column=cvalues.value_column,
)
)
)

for data in merged_values_by_channel.values():
for channel_data in data:
Expand Down

0 comments on commit 1c5a138

Please sign in to comment.