From 50b2e7e8944645edf8ec62d18f04a77d1ab7b17d Mon Sep 17 00:00:00 2001 From: Ailin Yu Date: Thu, 19 Sep 2024 17:53:24 -0500 Subject: [PATCH] python(chore): Add multiple CSV ingestion example (#98) --- .../{ => multi_csv}/.env-example | 0 .../multi_csv/channel_a.csv | 23 ++++ .../multi_csv/channel_b.csv | 20 +++ .../multi_csv/channel_c.csv | 22 ++++ .../ingestion_with_csv/multi_csv/main.py | 122 ++++++++++++++++++ .../{ => multi_csv}/requirements.txt | 0 .../single_csv/.env-example | 4 + .../{ => single_csv}/main.py | 0 .../single_csv/requirements.txt | 2 + .../{ => single_csv}/sample_data.csv | 0 10 files changed, 193 insertions(+) rename python/examples/ingestion_with_csv/{ => multi_csv}/.env-example (100%) create mode 100644 python/examples/ingestion_with_csv/multi_csv/channel_a.csv create mode 100644 python/examples/ingestion_with_csv/multi_csv/channel_b.csv create mode 100644 python/examples/ingestion_with_csv/multi_csv/channel_c.csv create mode 100644 python/examples/ingestion_with_csv/multi_csv/main.py rename python/examples/ingestion_with_csv/{ => multi_csv}/requirements.txt (100%) create mode 100644 python/examples/ingestion_with_csv/single_csv/.env-example rename python/examples/ingestion_with_csv/{ => single_csv}/main.py (100%) create mode 100644 python/examples/ingestion_with_csv/single_csv/requirements.txt rename python/examples/ingestion_with_csv/{ => single_csv}/sample_data.csv (100%) diff --git a/python/examples/ingestion_with_csv/.env-example b/python/examples/ingestion_with_csv/multi_csv/.env-example similarity index 100% rename from python/examples/ingestion_with_csv/.env-example rename to python/examples/ingestion_with_csv/multi_csv/.env-example diff --git a/python/examples/ingestion_with_csv/multi_csv/channel_a.csv b/python/examples/ingestion_with_csv/multi_csv/channel_a.csv new file mode 100644 index 00000000..10f7099a --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/channel_a.csv @@ -0,0 +1,23 @@ +time,channel_a +2024-01-28T09:12:44.531252-08:00,0.22632408240649787 +2024-01-28T09:12:45.531252-08:00,0.2507473982956536 +2024-01-28T09:12:46.531252-08:00,0.569768335603046 +2024-01-28T09:12:47.531252-08:00,0.11603978260515428 +2024-01-28T09:12:48.531252-08:00,0.7769021894063354 +2024-01-28T09:12:49.531252-08:00,0.925130023325051 +2024-01-28T09:12:50.531252-08:00,0.5742981300587023 +2024-01-28T09:12:51.531252-08:00,0.5838714673841429 +2024-01-28T09:12:52.531252-08:00,0.6445476967910116 +2024-01-28T09:12:53.531252-08:00,0.6168050494395134 +2024-01-28T09:12:54.531252-08:00,0.7069942145640749 +2024-01-28T09:12:55.531252-08:00,0.22707879474976028 +2024-01-28T09:12:56.531252-08:00,0.957768388142448 +2024-01-28T09:12:57.531252-08:00,0.6144049192190292 +2024-01-28T09:12:58.531252-08:00,0.0873028949386201 +2024-01-28T09:12:59.531252-08:00,0.3039485218345719 +2024-01-28T09:13:00.531252-08:00,0.22851326936800914 +2024-01-28T09:13:01.531252-08:00,0.16260281000247712 +2024-01-28T09:13:02.531252-08:00,0.09552401779445857 +2024-01-28T09:13:03.531252-08:00,0.9537078296985912 +2024-01-28T09:13:04.531252-08:00,0.9122053160758067 +2024-01-28T09:13:05.531252-08:00,0.7902955646688254 diff --git a/python/examples/ingestion_with_csv/multi_csv/channel_b.csv b/python/examples/ingestion_with_csv/multi_csv/channel_b.csv new file mode 100644 index 00000000..025dd5dc --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/channel_b.csv @@ -0,0 +1,20 @@ +time,channel_b +2024-01-28T09:12:45.531252-08:00,0.89109795898564 +2024-01-28T09:12:46.531252-08:00,0.9309029570727708 +2024-01-28T09:12:47.531252-08:00,0.41736205389867254 +2024-01-28T09:12:48.531252-08:00,0.7757735001457383 +2024-01-28T09:12:49.531252-08:00,0.42276349070729996 +2024-01-28T09:12:50.531252-08:00,0.9705230086382642 +2024-01-28T09:12:51.531252-08:00,0.734441844952279 +2024-01-28T09:12:52.531252-08:00,0.1391787659779769 +2024-01-28T09:12:53.531252-08:00,0.8297404689121078 +2024-01-28T09:12:54.531252-08:00,0.3378370313101441 +2024-01-28T09:12:56.531252-08:00,0.5828578086789715 +2024-01-28T09:12:57.531252-08:00,0.40521655584043825 +2024-01-28T09:12:58.531252-08:00,0.7945760115108459 +2024-01-28T09:12:59.531252-08:00,0.20324861252254495 +2024-01-28T09:13:00.531252-08:00,0.8613288704162841 +2024-01-28T09:13:01.531252-08:00,0.3912526730822837 +2024-01-28T09:13:02.531252-08:00,0.6454147210017829 +2024-01-28T09:13:03.531252-08:00,0.7354322452008073 +2024-01-28T09:13:04.531252-08:00,0.9664717997673006 diff --git a/python/examples/ingestion_with_csv/multi_csv/channel_c.csv b/python/examples/ingestion_with_csv/multi_csv/channel_c.csv new file mode 100644 index 00000000..850a658e --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/channel_c.csv @@ -0,0 +1,22 @@ +time,channel_c +2024-01-28T09:12:44.531252-08:00,0.3583413686283572 +2024-01-28T09:12:45.531252-08:00,0.3836948472275413 +2024-01-28T09:12:46.531252-08:00,0.9963335515068289 +2024-01-28T09:12:47.531252-08:00,0.7539072495801836 +2024-01-28T09:12:48.531252-08:00,0.7023922590155645 +2024-01-28T09:12:49.531252-08:00,0.25561715849715744 +2024-01-28T09:12:50.531252-08:00,0.5694787962865526 +2024-01-28T09:12:52.531252-08:00,0.9598539271439543 +2024-01-28T09:12:53.531252-08:00,0.8251028382570161 +2024-01-28T09:12:54.531252-08:00,0.407590907943275 +2024-01-28T09:12:55.531252-08:00,0.2872819025384998 +2024-01-28T09:12:56.531252-08:00,0.5869616167673772 +2024-01-28T09:12:57.531252-08:00,0.9568875633460442 +2024-01-28T09:12:58.531252-08:00,0.703233083472776 +2024-01-28T09:12:59.531252-08:00,0.7322267937013178 +2024-01-28T09:13:00.531252-08:00,0.8209383644559006 +2024-01-28T09:13:01.531252-08:00,0.9151461394004176 +2024-01-28T09:13:02.531252-08:00,0.8486339884028036 +2024-01-28T09:13:03.531252-08:00,0.1565551267309683 +2024-01-28T09:13:04.531252-08:00,0.4669848760366126 +2024-01-28T09:13:05.531252-08:00,0.8402548823816924 diff --git a/python/examples/ingestion_with_csv/multi_csv/main.py b/python/examples/ingestion_with_csv/multi_csv/main.py new file mode 100644 index 00000000..70c4b15e --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/main.py @@ -0,0 +1,122 @@ +import csv +import os +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Tuple + +from dotenv import load_dotenv +from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel +from sift_py.ingestion.channel import ChannelConfig, ChannelDataType, double_value, empty_value +from sift_py.ingestion.config.telemetry import TelemetryConfig +from sift_py.ingestion.flow import FlowConfig, FlowOrderedChannelValues +from sift_py.ingestion.service import IngestionService + + +def parse_csv( + data: List[Dict], telemetry_config: TelemetryConfig +) -> List[FlowOrderedChannelValues]: + flows: List[FlowOrderedChannelValues] = [] + + flow = telemetry_config.flows[0] # Packed into a single flow for this example + flow_name = flow.name + + all_timestamps = [ts for channel in data for ts in channel.keys()] + all_timestamps = sorted(list(set(all_timestamps))) # Remove duplicates and sort + + for timestamp in all_timestamps: + channel_values = [] + + for channel in data: + channel_data = channel.get(timestamp) + # Check if there's data present for each timestamp + if channel_data: + channel_values.append(double_value(float(channel_data))) + else: + channel_values.append(empty_value()) + + flows.append( + { + "flow_name": flow_name, + "timestamp": timestamp, + "channel_values": channel_values, + } + ) + + return flows + + +def load_telemetry_config( + csv_paths: List[Path], asset_name: str, ingestion_client_key: str +) -> Tuple[TelemetryConfig, List[Dict]]: + channels = [] + data: List[Dict] = [] # Each channel will have its own dictionary: {timestamp: value} + + for path_to_csv in csv_paths: + with open(path_to_csv, "r") as csv_file: + reader = csv.reader(csv_file) + header = next(reader) # Grab header + channel_name = header[1] # Assuming only one channel per CSV + + channels.append( + ChannelConfig( + name=channel_name, + data_type=ChannelDataType.DOUBLE, # Assuming all channels are doubles for this example + ) + ) + + channel_data = {} + for row in reader: + timestamp_str, value = row[0], row[1] # Assuming only one channel per CSV + channel_data.update({datetime.fromisoformat(timestamp_str): value}) + data.append(channel_data) + + telemetry_config = TelemetryConfig( + asset_name=asset_name, + ingestion_client_key=ingestion_client_key, + flows=[FlowConfig(name="data", channels=channels)], + ) + return telemetry_config, data + + +if __name__ == "__main__": + """ + Example of ingesting data from a CSV file into Sift. + """ + + load_dotenv() + + sift_uri = os.getenv("SIFT_API_URI") + assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set" + + apikey = os.getenv("SIFT_API_KEY") + assert apikey, "expected 'SIFT_API_KEY' environment variable to be set" + + asset_name = os.getenv("ASSET_NAME") + assert asset_name, "expected 'ASSET_NAME' environment variable to be set" + + ingestion_client_key = os.getenv("INGESTION_CLIENT_KEY") + assert ingestion_client_key, "expected 'INGESTION_CLIENT_KEY' environment variable to be set" + + csv_data = [Path("channel_a.csv"), Path("channel_b.csv"), Path("channel_c.csv")] + + telemetry_config, data = load_telemetry_config(csv_data, asset_name, ingestion_client_key) + flows = parse_csv(data, telemetry_config) + + sift_channel_config = SiftChannelConfig( + uri=sift_uri, + apikey=apikey, + ) + + with use_sift_channel(sift_channel_config) as channel: + # Create ingestion service using the telemetry config + ingestion_service = IngestionService( + channel=channel, + config=telemetry_config, + ) + + # Create a new run as part of this ingestion + run_name = f"{asset_name}-{datetime.now()}" + ingestion_service.attach_run(channel, run_name, "example csv ingestion") + + with ingestion_service.buffered_ingestion() as buffered_ingestion: + buffered_ingestion.ingest_flows(*flows) diff --git a/python/examples/ingestion_with_csv/requirements.txt b/python/examples/ingestion_with_csv/multi_csv/requirements.txt similarity index 100% rename from python/examples/ingestion_with_csv/requirements.txt rename to python/examples/ingestion_with_csv/multi_csv/requirements.txt diff --git a/python/examples/ingestion_with_csv/single_csv/.env-example b/python/examples/ingestion_with_csv/single_csv/.env-example new file mode 100644 index 00000000..0901498f --- /dev/null +++ b/python/examples/ingestion_with_csv/single_csv/.env-example @@ -0,0 +1,4 @@ +SIFT_API_URI="" +SIFT_API_KEY="" +ASSET_NAME="" +INGESTION_CLIENT_KEY="" \ No newline at end of file diff --git a/python/examples/ingestion_with_csv/main.py b/python/examples/ingestion_with_csv/single_csv/main.py similarity index 100% rename from python/examples/ingestion_with_csv/main.py rename to python/examples/ingestion_with_csv/single_csv/main.py diff --git a/python/examples/ingestion_with_csv/single_csv/requirements.txt b/python/examples/ingestion_with_csv/single_csv/requirements.txt new file mode 100644 index 00000000..2dda90fe --- /dev/null +++ b/python/examples/ingestion_with_csv/single_csv/requirements.txt @@ -0,0 +1,2 @@ +python-dotenv +sift-stack-py diff --git a/python/examples/ingestion_with_csv/sample_data.csv b/python/examples/ingestion_with_csv/single_csv/sample_data.csv similarity index 100% rename from python/examples/ingestion_with_csv/sample_data.csv rename to python/examples/ingestion_with_csv/single_csv/sample_data.csv