Skip to content

Commit

Permalink
Add TDMS upload
Browse files Browse the repository at this point in the history
  • Loading branch information
marcsiftstack committed Oct 16, 2024
1 parent 6ff7ee7 commit 0a9e736
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 0 deletions.
31 changes: 31 additions & 0 deletions python/examples/data_import/tdms/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os

from dotenv import load_dotenv
from sift_py.data_import.tdms import TdmsUploadService
from sift_py.rest import SiftRestConfig

if __name__ == "__main__":
"""
Example of uploading a TDMS file into Sift.
"""

load_dotenv()

sift_uri = os.getenv("SIFT_API_URI")
assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set"

apikey = os.getenv("SIFT_API_KEY")
assert apikey, "expected 'SIFT_API_KEY' environment variable to be set"

asset_name = os.getenv("ASSET_NAME")
assert asset_name, "expected 'ASSET_NAME' environment variable to be set"

rest_config: SiftRestConfig = {
"uri": sift_uri,
"apikey": apikey,
}

tdms_upload_service = TdmsUploadService(rest_config)
status = tdms_upload_service.upload("sample_data.tdms", asset_name, group_into_components=True)
status.wait_until_complete()
print("Upload example complete!")
2 changes: 2 additions & 0 deletions python/examples/data_import/tdms/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
python-dotenv
sift-stack-py
Binary file added python/examples/data_import/tdms/sample_data.tdms
Binary file not shown.
189 changes: 189 additions & 0 deletions python/lib/sift_py/data_import/tdms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Dict, List, Optional, Union

from nptdms import RootObject, TdmsChannel, TdmsFile, TdmsWriter, types # type: ignore

from sift_py.data_import._config import DataColumn, TimeColumn
from sift_py.data_import.config import CsvConfig
from sift_py.data_import.csv import CsvUploadService
from sift_py.data_import.status import DataImportService
from sift_py.data_import.time_format import TimeFormatType
from sift_py.ingestion.channel import ChannelDataType
from sift_py.rest import SiftRestConfig

TDMS_TO_SIFT_TYPES = {
types.Boolean: ChannelDataType.BOOL,
types.Int8: ChannelDataType.INT_32,
types.Int16: ChannelDataType.INT_32,
types.Int32: ChannelDataType.INT_32,
types.Int64: ChannelDataType.INT_64,
types.Uint8: ChannelDataType.UINT_32,
types.Uint16: ChannelDataType.UINT_32,
types.Uint32: ChannelDataType.UINT_32,
types.Uint64: ChannelDataType.UINT_64,
types.SingleFloat: ChannelDataType.FLOAT,
types.DoubleFloat: ChannelDataType.DOUBLE,
}


class TdmsUploadService:
"""
Service to upload TDMS files.
"""

_csv_upload_service: CsvUploadService

def __init__(self, rest_conf: SiftRestConfig):
self._csv_upload_service = CsvUploadService(rest_conf)

def upload(
self,
path: Union[str, Path],
asset_name: str,
group_into_components: bool = False,
ignore_errors: bool = False,
run_name: Optional[str] = None,
run_id: Optional[str] = None,
) -> DataImportService:
"""
Uploads the TDMS file pointed to by `path` to the specified asset.
Set `group_into_components` to True if you want to upload the TDMS groups as
a Sift Component.
If `ignore_errors` is True will skip channels without timing information.
Override `run_name` to specify the name of the run to create for this data. Default is None.
Override `run_id` to specify the id of the run to add this data to. Default is None.
"""
posix_path = Path(path) if isinstance(path, str) else path

if not posix_path.is_file():
raise Exception(f"Provided path, '{path}', does not point to a regular file.")

with NamedTemporaryFile(mode="w", suffix=".csv") as temp_file:
valid_channels = self._convert_to_csv(path, temp_file.name, ignore_errors)
csv_config = self._create_csv_config(
valid_channels, asset_name, group_into_components, run_name, run_id
)
return self._csv_upload_service.upload(temp_file.name, csv_config)

def _convert_to_csv(
self, src_path: Union[str, Path], dst_path: Union[str, Path], ignore_errors: bool
) -> List[TdmsChannel]:
"""Converts the TDMS file to a temporary CSV on disk that we will upload.
Returns the valid channels after parsing the TDMS file. Valid channels contain
timing information.
"""

def contains_timing(channel: TdmsChannel) -> bool:
"""Returns true if the TDMS Channel contains timing information."""
return all(
[
"wf_increment" in channel.properties,
"wf_start_time" in channel.properties,
"wf_start_offset" in channel.properties,
]
)

def normalize_channel_name(channel_name: str) -> str:
"""Normalize channel names by invalid characters."""
return " ".join(channel_name.replace("/", " ").split())

src_file = TdmsFile(src_path)

original_groups = src_file.groups()
valid_channels: List[TdmsChannel] = []
for group in original_groups:
for channel in group.channels():
if contains_timing(channel):
valid_channels.append(channel)
else:
if ignore_errors:
print(
f"{group.name}:{channel.name} does not contain timing information. Skipping."
)
else:
raise Exception(
f"{group.name}:{channel.name} does not contain timing information. "
"Set `ignore_errors` to True to skip channels without timing information."
)

# Write out the new TDMS file with invalid channels removed, then convert to csv.
with NamedTemporaryFile(mode="w") as f:
with TdmsWriter(f.name) as tdms_writer:
root_object = RootObject(src_file.properties)
tdms_writer.write_segment([root_object] + original_groups + valid_channels)

filtered_tdms_file = TdmsFile.read(f.name)
df = filtered_tdms_file.as_dataframe(time_index=True, absolute_time=True)

updated_names = {
original_name: normalize_channel_name(original_name) for original_name in df.keys()
}
df.rename(updated_names, axis=1, inplace=True)
df.to_csv(dst_path, encoding="utf-8")

return valid_channels

def _create_csv_config(
self,
channels: List[TdmsChannel],
asset_name: str,
group_into_components: bool,
run_name: Optional[str] = None,
run_id: Optional[str] = None,
) -> CsvConfig:
"""Construct a CsvConfig based on metadata within the TDMS file."""
data_config: Dict[int, DataColumn] = {}
# Data columns start in column 2 (1-indexed)
first_data_column = 2
for i, channel in enumerate(channels):
try:
data_type = TDMS_TO_SIFT_TYPES[channel.data_type].as_human_str(api_format=True)
except KeyError:
data_type = None

if data_type is None:
raise Exception(f"{channel.name} data type not supported: {channel.data_type}")

extra_info = ""
for k, v in channel.properties.items():
# Skip these since the csv config has dedicated fields for them.
if k in ["description", "unit_string"]:
continue
# Must convert datetime to a string
elif k == "wf_start_time":
v = str(v)
extra_info += f"{k}: {v}\n"

channel_config = DataColumn(
name=channel.name,
data_type=data_type,
description=f"{channel.properties.get('description')}\n{extra_info}",
units=channel.properties.get("unit_string") or "",
)
if group_into_components and channel.group_name:
channel_config.component = channel.group_name

data_config[first_data_column + i] = channel_config

config_info = {
"asset_name": asset_name,
"first_data_row": first_data_column,
"time_column": TimeColumn(
format=TimeFormatType.ABSOLUTE_DATETIME,
column_number=1,
),
"data_columns": data_config,
}

if run_name is not None:
config_info["run_name"] = run_name

if run_id is not None:
config_info["run_id"] = run_id

return CsvConfig(config_info)
1 change: 1 addition & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ keywords = [
]
dependencies = [
"grpcio~=1.64",
"npTDMS~=1.9",
"PyYAML~=6.0",
"pandas~=2.0",
"protobuf~=5.26",
Expand Down

0 comments on commit 0a9e736

Please sign in to comment.