Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python(bug): fix data download bug for channels with '.' delimited name #138

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 63 additions & 11 deletions python/lib/sift_py/data/_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,25 @@ async def test_data_service_execute_regular_channels(mocker: MockFixture):
channel_name="gpio",
run_name="[NostromoLV426].1720141748.047512",
),
ChannelQuery(
channel_name="valve.pressure",
run_name="[NostromoLV426].1720141748.047512",
),
],
)

result = await data_service.execute(query)

mock_get_asset = mocks["mock_get_asset_by_name"]
mock_get_channels = mocks["mock_get_channels_by_asset_id_and_channel_fqns"]
mock_get_channels = mocks["mock_get_channels_by_asset_id"]
mock_get_runs = mocks["mock_get_runs_by_names"]

mock_get_asset.assert_called_once()
mock_get_channels.assert_called_once()
mock_get_runs.assert_called_once()

# bit field elements count as separate channels
assert len(result.all_channels()) == 3
assert len(result.all_channels()) == 4
assert not result.channel("velocity")
assert not result.channels("velocity")
assert len(result.channels("mainmotor.velocity")) == 1
Expand Down Expand Up @@ -111,6 +115,27 @@ async def test_data_service_execute_regular_channels(mocker: MockFixture):
assert len(gpio_heater.value_column()["gpio.heater"]) == 1
assert len(gpio_heater.value_column("heater")["heater"]) == 1

pressure = result.channel("valve.pressure")
assert pressure is not None
assert len(pressure.timestamps) == 2
assert len(pressure.time_column()["time"]) == 2
assert len(pressure.time_column("custom_column_name")["custom_column_name"]) == 2
assert len(pressure.value_column()["valve.pressure"]) == 2
assert len(pressure.value_column("custom_column_name")["custom_column_name"]) == 2

all_columns = pressure.columns()
assert len(all_columns) == 2
assert len(all_columns["time"]) == 2
assert len(all_columns["valve.pressure"]) == 2

all_columns_custom = pressure.columns(
time_column_name="ts",
value_column_name="valve.pressure",
)
assert len(all_columns_custom) == 2
assert len(all_columns_custom["ts"]) == 2
assert len(all_columns_custom["valve.pressure"]) == 2


@contextmanager
def patch_grpc_calls_channels(mocker: MockFixture) -> Iterator[Dict[str, MockType]]:
Expand All @@ -119,10 +144,8 @@ def patch_grpc_calls_channels(mocker: MockFixture) -> Iterator[Dict[str, MockTyp
asset_id="b7955799-9893-4acf-bf14-50052284020c", name="NostromoLV428"
)

mock__get_channels_by_asset_id_and_channel_fqns = mocker.patch.object(
DataService, "_get_channels_by_asset_id_and_channel_fqns"
)
mock__get_channels_by_asset_id_and_channel_fqns.return_value = [
mock__get_channels_by_asset_id = mocker.patch.object(DataService, "_get_channels_by_asset_id")
mock__get_channels_by_asset_id.return_value = [
Channel(
channel_id="e8662647-12f7-465f-85dc-cb02513944e0",
name="velocity",
Expand All @@ -134,6 +157,11 @@ def patch_grpc_calls_channels(mocker: MockFixture) -> Iterator[Dict[str, MockTyp
name="gpio",
data_type=CHANNEL_DATA_TYPE_BIT_FIELD,
),
Channel(
channel_id="87e25141-ed3e-4538-b063-c3eac30838cd",
name="valve.pressure",
data_type=CHANNEL_DATA_TYPE_DOUBLE,
),
]

mock__get_runs_by_names = mocker.patch.object(DataService, "_get_runs_by_names")
Expand All @@ -147,7 +175,7 @@ def patch_grpc_calls_channels(mocker: MockFixture) -> Iterator[Dict[str, MockTyp
time_a = "2024-07-04T18:09:08.555-07:00"
time_b = "2024-07-04T18:09:09.555-07:00"

double_values = DoubleValues(
velocity_values = DoubleValues(
metadata=Metadata(
data_type=CHANNEL_DATA_TYPE_DOUBLE,
channel=Metadata.Channel(name="velocity", component="mainmotor"),
Expand All @@ -164,8 +192,31 @@ def patch_grpc_calls_channels(mocker: MockFixture) -> Iterator[Dict[str, MockTyp
],
)

raw_double_values = Any()
raw_double_values.Pack(double_values)
raw_velocity_values = Any()
raw_velocity_values.Pack(velocity_values)

time_a = "2024-07-04T18:09:08.555-07:00"
time_b = "2024-07-04T18:09:09.555-07:00"

pressure_values = DoubleValues(
metadata=Metadata(
data_type=CHANNEL_DATA_TYPE_DOUBLE,
channel=Metadata.Channel(name="valve.pressure"),
),
values=[
DoubleValue(
timestamp=to_timestamp_pb(time_a),
value=10,
),
DoubleValue(
timestamp=to_timestamp_pb(time_b),
value=11,
),
],
)

raw_pressure_values = Any()
raw_pressure_values.Pack(pressure_values)

bit_field_values = BitFieldValues(
metadata=Metadata(
Expand Down Expand Up @@ -213,12 +264,13 @@ def patch_grpc_calls_channels(mocker: MockFixture) -> Iterator[Dict[str, MockTyp

mock__get_data = mocker.patch.object(DataService, "_get_data")
mock__get_data.side_effect = [
[[raw_double_values]],
[[raw_velocity_values]],
[[raw_bit_field_values]],
[[raw_pressure_values]],
]
yield {
"mock_get_asset_by_name": mock__get_asset_by_name,
"mock_get_runs_by_names": mock__get_runs_by_names,
"mock_get_channels_by_asset_id_and_channel_fqns": mock__get_channels_by_asset_id_and_channel_fqns,
"mock_get_channels_by_asset_id": mock__get_channels_by_asset_id,
"mock_get_data": mock__get_data,
}
46 changes: 24 additions & 22 deletions python/lib/sift_py/data/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,19 @@ async def execute(self, query: DataQuery, bust_cache: bool = False) -> DataQuery

asset = await self._load_asset(query.asset_name)

channel_fqns = []

channel_queries: List[ChannelQuery] = []
for c in query.channels:
if isinstance(c, ChannelQuery):
channel_fqns.append(channel_fqn(c.channel_name, c.component))
channel_queries.append(c)
elif isinstance(c, CalculatedChannelQuery):
for ref in c.expression_channel_references:
channel_fqns.append(channel_fqn(ref["channel_name"], ref.get("component")))
channel_name = ref["channel_name"]
component = ref.get("component")
channel_queries.append(
ChannelQuery(channel_name=channel_name, component=component)
)

channels = await self._load_channels(asset, channel_fqns)
channels = await self._load_channels(asset, channel_queries)
runs = await self._load_runs(query.channels)

queries: List[Query] = []
Expand Down Expand Up @@ -307,12 +310,12 @@ async def _load_asset(self, asset_name: str) -> Asset:
return asset

async def _load_channels(
self, asset: Asset, channel_fqns: List[str]
self,
asset: Asset,
channel_queries: List[ChannelQuery],
) -> Dict[ChannelFqn, List[Channel]]:
if self._cached_channels.get(asset.name) is None:
sift_channels = await self._get_channels_by_asset_id_and_channel_fqns(
asset.asset_id, channel_fqns
)
sift_channels = await self._get_channels_by_asset_id(asset.asset_id, channel_queries)

channels = defaultdict(list)

Expand All @@ -322,17 +325,16 @@ async def _load_channels(
self._cached_channels[asset.name] = channels
return self._cached_channels[asset.name]

chans = self._cached_channels[asset.name]

channels_to_retrieve = []

for fqn in channel_fqns:
if chans.get(fqn) is None:
channels_to_retrieve.append(fqn)
cached_channels = self._cached_channels[asset.name]
channels_to_retrieve: List[ChannelQuery] = []
for query in channel_queries:
fqn = channel_fqn(query.channel_name, query.component)
if cached_channels.get(fqn) is None:
channels_to_retrieve.append(query)

sift_channels = []
if len(channels_to_retrieve) > 0:
sift_channels = await self._get_channels_by_asset_id_and_channel_fqns(
sift_channels = await self._get_channels_by_asset_id(
asset.asset_id, channels_to_retrieve
)

Expand Down Expand Up @@ -422,18 +424,18 @@ async def _get_runs_by_names(self, run_names: Set[str]) -> List[Run]:

return runs

async def _get_channels_by_asset_id_and_channel_fqns(
self, asset_id: str, channel_fqns: List[str]
async def _get_channels_by_asset_id(
self, asset_id: str, channel_queries: List[ChannelQuery]
) -> List[Channel]:
if len(asset_id) == 0 or len(channel_fqns) == 0:
if len(asset_id) == 0 or len(channel_queries) == 0:
return []

channels: List[Channel] = []

channel_names = []

for fqn in channel_fqns:
channel_names.append(fqn.split(".")[-1])
for query in channel_queries:
channel_names.append(query.channel_name)

name_in = cel_in("name", channel_names)

Expand Down
4 changes: 2 additions & 2 deletions rust/examples/ingestion/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

/// Channel and flow configuration used to create an ingestion config.
pub fn channel_configs() -> Vec<FlowConfig> {
return vec![FlowConfig {
vec![FlowConfig {
name: String::from("velocity_reading"),
channels: vec![ChannelConfig {
name: String::from("velocity"),
Expand All @@ -88,7 +88,7 @@ pub fn channel_configs() -> Vec<FlowConfig> {
data_type: ChannelDataType::Double.into(),
..Default::default()
}],
}];
}]
}

/// Retrieves an existing ingestion config or create it.
Expand Down
Loading