From b902de0f1d662d50af1033af31f283dd997b52ef Mon Sep 17 00:00:00 2001 From: Ailin Yu Date: Wed, 18 Sep 2024 11:42:26 -0700 Subject: [PATCH 1/6] refactor: move existing csv example into subdir --- python/examples/ingestion_with_csv/{ => single_csv}/.env-example | 0 python/examples/ingestion_with_csv/{ => single_csv}/main.py | 0 .../examples/ingestion_with_csv/{ => single_csv}/requirements.txt | 0 .../examples/ingestion_with_csv/{ => single_csv}/sample_data.csv | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename python/examples/ingestion_with_csv/{ => single_csv}/.env-example (100%) rename python/examples/ingestion_with_csv/{ => single_csv}/main.py (100%) rename python/examples/ingestion_with_csv/{ => single_csv}/requirements.txt (100%) rename python/examples/ingestion_with_csv/{ => single_csv}/sample_data.csv (100%) diff --git a/python/examples/ingestion_with_csv/.env-example b/python/examples/ingestion_with_csv/single_csv/.env-example similarity index 100% rename from python/examples/ingestion_with_csv/.env-example rename to python/examples/ingestion_with_csv/single_csv/.env-example diff --git a/python/examples/ingestion_with_csv/main.py b/python/examples/ingestion_with_csv/single_csv/main.py similarity index 100% rename from python/examples/ingestion_with_csv/main.py rename to python/examples/ingestion_with_csv/single_csv/main.py diff --git a/python/examples/ingestion_with_csv/requirements.txt b/python/examples/ingestion_with_csv/single_csv/requirements.txt similarity index 100% rename from python/examples/ingestion_with_csv/requirements.txt rename to python/examples/ingestion_with_csv/single_csv/requirements.txt diff --git a/python/examples/ingestion_with_csv/sample_data.csv b/python/examples/ingestion_with_csv/single_csv/sample_data.csv similarity index 100% rename from python/examples/ingestion_with_csv/sample_data.csv rename to python/examples/ingestion_with_csv/single_csv/sample_data.csv From f33783b44bd6c1c1d7da1d3d853e2442fd343806 Mon Sep 17 00:00:00 2001 From: Ailin Yu Date: Wed, 18 Sep 2024 13:26:19 -0700 Subject: [PATCH 2/6] chore: Multi-csv example --- .../ingestion_with_csv/multi_csv/.env-example | 4 + .../multi_csv/channel_a.csv | 23 ++++ .../multi_csv/channel_b.csv | 20 +++ .../multi_csv/channel_c.csv | 22 ++++ .../ingestion_with_csv/multi_csv/main.py | 124 ++++++++++++++++++ .../multi_csv/requirements.txt | 2 + 6 files changed, 195 insertions(+) create mode 100644 python/examples/ingestion_with_csv/multi_csv/.env-example create mode 100644 python/examples/ingestion_with_csv/multi_csv/channel_a.csv create mode 100644 python/examples/ingestion_with_csv/multi_csv/channel_b.csv create mode 100644 python/examples/ingestion_with_csv/multi_csv/channel_c.csv create mode 100644 python/examples/ingestion_with_csv/multi_csv/main.py create mode 100644 python/examples/ingestion_with_csv/multi_csv/requirements.txt diff --git a/python/examples/ingestion_with_csv/multi_csv/.env-example b/python/examples/ingestion_with_csv/multi_csv/.env-example new file mode 100644 index 00000000..0901498f --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/.env-example @@ -0,0 +1,4 @@ +SIFT_API_URI="" +SIFT_API_KEY="" +ASSET_NAME="" +INGESTION_CLIENT_KEY="" \ No newline at end of file diff --git a/python/examples/ingestion_with_csv/multi_csv/channel_a.csv b/python/examples/ingestion_with_csv/multi_csv/channel_a.csv new file mode 100644 index 00000000..10f7099a --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/channel_a.csv @@ -0,0 +1,23 @@ +time,channel_a +2024-01-28T09:12:44.531252-08:00,0.22632408240649787 +2024-01-28T09:12:45.531252-08:00,0.2507473982956536 +2024-01-28T09:12:46.531252-08:00,0.569768335603046 +2024-01-28T09:12:47.531252-08:00,0.11603978260515428 +2024-01-28T09:12:48.531252-08:00,0.7769021894063354 +2024-01-28T09:12:49.531252-08:00,0.925130023325051 +2024-01-28T09:12:50.531252-08:00,0.5742981300587023 +2024-01-28T09:12:51.531252-08:00,0.5838714673841429 +2024-01-28T09:12:52.531252-08:00,0.6445476967910116 +2024-01-28T09:12:53.531252-08:00,0.6168050494395134 +2024-01-28T09:12:54.531252-08:00,0.7069942145640749 +2024-01-28T09:12:55.531252-08:00,0.22707879474976028 +2024-01-28T09:12:56.531252-08:00,0.957768388142448 +2024-01-28T09:12:57.531252-08:00,0.6144049192190292 +2024-01-28T09:12:58.531252-08:00,0.0873028949386201 +2024-01-28T09:12:59.531252-08:00,0.3039485218345719 +2024-01-28T09:13:00.531252-08:00,0.22851326936800914 +2024-01-28T09:13:01.531252-08:00,0.16260281000247712 +2024-01-28T09:13:02.531252-08:00,0.09552401779445857 +2024-01-28T09:13:03.531252-08:00,0.9537078296985912 +2024-01-28T09:13:04.531252-08:00,0.9122053160758067 +2024-01-28T09:13:05.531252-08:00,0.7902955646688254 diff --git a/python/examples/ingestion_with_csv/multi_csv/channel_b.csv b/python/examples/ingestion_with_csv/multi_csv/channel_b.csv new file mode 100644 index 00000000..025dd5dc --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/channel_b.csv @@ -0,0 +1,20 @@ +time,channel_b +2024-01-28T09:12:45.531252-08:00,0.89109795898564 +2024-01-28T09:12:46.531252-08:00,0.9309029570727708 +2024-01-28T09:12:47.531252-08:00,0.41736205389867254 +2024-01-28T09:12:48.531252-08:00,0.7757735001457383 +2024-01-28T09:12:49.531252-08:00,0.42276349070729996 +2024-01-28T09:12:50.531252-08:00,0.9705230086382642 +2024-01-28T09:12:51.531252-08:00,0.734441844952279 +2024-01-28T09:12:52.531252-08:00,0.1391787659779769 +2024-01-28T09:12:53.531252-08:00,0.8297404689121078 +2024-01-28T09:12:54.531252-08:00,0.3378370313101441 +2024-01-28T09:12:56.531252-08:00,0.5828578086789715 +2024-01-28T09:12:57.531252-08:00,0.40521655584043825 +2024-01-28T09:12:58.531252-08:00,0.7945760115108459 +2024-01-28T09:12:59.531252-08:00,0.20324861252254495 +2024-01-28T09:13:00.531252-08:00,0.8613288704162841 +2024-01-28T09:13:01.531252-08:00,0.3912526730822837 +2024-01-28T09:13:02.531252-08:00,0.6454147210017829 +2024-01-28T09:13:03.531252-08:00,0.7354322452008073 +2024-01-28T09:13:04.531252-08:00,0.9664717997673006 diff --git a/python/examples/ingestion_with_csv/multi_csv/channel_c.csv b/python/examples/ingestion_with_csv/multi_csv/channel_c.csv new file mode 100644 index 00000000..850a658e --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/channel_c.csv @@ -0,0 +1,22 @@ +time,channel_c +2024-01-28T09:12:44.531252-08:00,0.3583413686283572 +2024-01-28T09:12:45.531252-08:00,0.3836948472275413 +2024-01-28T09:12:46.531252-08:00,0.9963335515068289 +2024-01-28T09:12:47.531252-08:00,0.7539072495801836 +2024-01-28T09:12:48.531252-08:00,0.7023922590155645 +2024-01-28T09:12:49.531252-08:00,0.25561715849715744 +2024-01-28T09:12:50.531252-08:00,0.5694787962865526 +2024-01-28T09:12:52.531252-08:00,0.9598539271439543 +2024-01-28T09:12:53.531252-08:00,0.8251028382570161 +2024-01-28T09:12:54.531252-08:00,0.407590907943275 +2024-01-28T09:12:55.531252-08:00,0.2872819025384998 +2024-01-28T09:12:56.531252-08:00,0.5869616167673772 +2024-01-28T09:12:57.531252-08:00,0.9568875633460442 +2024-01-28T09:12:58.531252-08:00,0.703233083472776 +2024-01-28T09:12:59.531252-08:00,0.7322267937013178 +2024-01-28T09:13:00.531252-08:00,0.8209383644559006 +2024-01-28T09:13:01.531252-08:00,0.9151461394004176 +2024-01-28T09:13:02.531252-08:00,0.8486339884028036 +2024-01-28T09:13:03.531252-08:00,0.1565551267309683 +2024-01-28T09:13:04.531252-08:00,0.4669848760366126 +2024-01-28T09:13:05.531252-08:00,0.8402548823816924 diff --git a/python/examples/ingestion_with_csv/multi_csv/main.py b/python/examples/ingestion_with_csv/multi_csv/main.py new file mode 100644 index 00000000..857bc817 --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/main.py @@ -0,0 +1,124 @@ +import csv +import os +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Tuple + +from dotenv import load_dotenv +from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel +from sift_py.ingestion.channel import ChannelConfig, ChannelDataType, double_value, empty_value +from sift_py.ingestion.config.telemetry import TelemetryConfig +from sift_py.ingestion.flow import FlowConfig, FlowOrderedChannelValues +from sift_py.ingestion.service import IngestionService + + +def parse_csv( + data: List[Dict], telemetry_config: TelemetryConfig +) -> List[FlowOrderedChannelValues]: + flows: List[FlowOrderedChannelValues] = [] + + flow = telemetry_config.flows[0] # Packed into a single flow for this example + flow_name = flow.name + + all_timestamps: List = [] + for channel in data: + all_timestamps += channel.keys() + + for timestamp in sorted(list(set(all_timestamps))): + channel_values = [] + for channel in data: + channel_data = channel.get(timestamp) + if channel_data: + channel_values.append(double_value(float(channel_data))) + else: + channel_values.append(empty_value()) + + flows.append( + { + "flow_name": flow_name, + "timestamp": timestamp, + "channel_values": channel_values, + } + ) + + return flows + + +def load_telemetry_config( + csv_paths: List[Path], asset_name: str, ingestion_client_key: str +) -> Tuple[TelemetryConfig, List[Dict]]: + channels = [] + data: List[Dict] = [] + + for path_to_csv in csv_paths: + with open(path_to_csv, "r") as csv_file: + reader = csv.reader(csv_file) + header = next(reader) # Grab header + channel_name = header[1] # Assuming only one channel per CSV + + channels.append( + ChannelConfig( + name=channel_name, + data_type=ChannelDataType.DOUBLE, # Assuming all channels are doubles for this example + ) + ) + + channel_data = {} + for row in reader: + timestamp_str, value = row[0], row[1] # Assuming only one channel per CSV + channel_data.update({ + datetime.fromisoformat(timestamp_str): + value + }) + data.append(channel_data) + + telemetry_config = TelemetryConfig( + asset_name=asset_name, + ingestion_client_key=ingestion_client_key, + flows=[FlowConfig(name="data", channels=channels)], + ) + return telemetry_config, data + + +if __name__ == "__main__": + """ + Example of ingesting data from a CSV file into Sift. + """ + + load_dotenv() + + sift_uri = os.getenv("SIFT_API_URI") + assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set" + + apikey = os.getenv("SIFT_API_KEY") + assert apikey, "expected 'SIFT_API_KEY' environment variable to be set" + + asset_name = os.getenv("ASSET_NAME") + assert asset_name, "expected 'ASSET_NAME' environment variable to be set" + + ingestion_client_key = os.getenv("INGESTION_CLIENT_KEY") + assert ingestion_client_key, "expected 'INGESTION_CLIENT_KEY' environment variable to be set" + + csv_data = [Path("channel_a.csv"), Path("channel_b.csv"), Path("channel_c.csv")] + + telemetry_config, data = load_telemetry_config(csv_data, asset_name, ingestion_client_key) + flows = parse_csv(data, telemetry_config) + + sift_channel_config = SiftChannelConfig( + uri=sift_uri, + apikey=apikey, + ) + + with use_sift_channel(sift_channel_config) as channel: + # Create ingestion service using the telemetry config + ingestion_service = IngestionService( + channel=channel, + config=telemetry_config, + ) + + # Create a new run as part of this ingestion + run_name = f"{asset_name}-{datetime.now()}" + ingestion_service.attach_run(channel, run_name, "example csv ingestion") + + with ingestion_service.buffered_ingestion() as buffered_ingestion: + buffered_ingestion.ingest_flows(*flows) diff --git a/python/examples/ingestion_with_csv/multi_csv/requirements.txt b/python/examples/ingestion_with_csv/multi_csv/requirements.txt new file mode 100644 index 00000000..2dda90fe --- /dev/null +++ b/python/examples/ingestion_with_csv/multi_csv/requirements.txt @@ -0,0 +1,2 @@ +python-dotenv +sift-stack-py From 0c722a9d4013d6ef7adb1bac0457ec44e0e4c46a Mon Sep 17 00:00:00 2001 From: Ailin Yu Date: Wed, 18 Sep 2024 13:29:09 -0700 Subject: [PATCH 3/6] chore: Add some comments --- python/examples/ingestion_with_csv/multi_csv/main.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/examples/ingestion_with_csv/multi_csv/main.py b/python/examples/ingestion_with_csv/multi_csv/main.py index 857bc817..a9fb1400 100644 --- a/python/examples/ingestion_with_csv/multi_csv/main.py +++ b/python/examples/ingestion_with_csv/multi_csv/main.py @@ -21,13 +21,16 @@ def parse_csv( flow_name = flow.name all_timestamps: List = [] - for channel in data: + for channel in data: # Extract all timestamps from all channels all_timestamps += channel.keys() + all_timestamps = sorted(list(set(all_timestamps))) # Remove duplicates and sort - for timestamp in sorted(list(set(all_timestamps))): + for timestamp in all_timestamps: channel_values = [] + for channel in data: channel_data = channel.get(timestamp) + # Check if there's data present for each timestamp if channel_data: channel_values.append(double_value(float(channel_data))) else: @@ -48,7 +51,7 @@ def load_telemetry_config( csv_paths: List[Path], asset_name: str, ingestion_client_key: str ) -> Tuple[TelemetryConfig, List[Dict]]: channels = [] - data: List[Dict] = [] + data: List[Dict] = [] # Each channel will have its own dictionary: {timestamp: value} for path_to_csv in csv_paths: with open(path_to_csv, "r") as csv_file: From cec2f790d869fba421cd92532673d38b960ee8ed Mon Sep 17 00:00:00 2001 From: Ailin Yu Date: Wed, 18 Sep 2024 13:33:20 -0700 Subject: [PATCH 4/6] chore: format --- python/examples/ingestion_with_csv/multi_csv/main.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/examples/ingestion_with_csv/multi_csv/main.py b/python/examples/ingestion_with_csv/multi_csv/main.py index a9fb1400..576031fd 100644 --- a/python/examples/ingestion_with_csv/multi_csv/main.py +++ b/python/examples/ingestion_with_csv/multi_csv/main.py @@ -68,11 +68,8 @@ def load_telemetry_config( channel_data = {} for row in reader: - timestamp_str, value = row[0], row[1] # Assuming only one channel per CSV - channel_data.update({ - datetime.fromisoformat(timestamp_str): - value - }) + timestamp_str, value = row[0], row[1] # Assuming only one channel per CSV + channel_data.update({datetime.fromisoformat(timestamp_str): value}) data.append(channel_data) telemetry_config = TelemetryConfig( From 6743ec57643989a3df2666efdd94364d167e996c Mon Sep 17 00:00:00 2001 From: Ailin Yu Date: Thu, 19 Sep 2024 12:05:20 -0500 Subject: [PATCH 5/6] chore: PR feedback --- python/examples/ingestion_with_csv/multi_csv/main.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/examples/ingestion_with_csv/multi_csv/main.py b/python/examples/ingestion_with_csv/multi_csv/main.py index 576031fd..40d575e6 100644 --- a/python/examples/ingestion_with_csv/multi_csv/main.py +++ b/python/examples/ingestion_with_csv/multi_csv/main.py @@ -20,9 +20,11 @@ def parse_csv( flow = telemetry_config.flows[0] # Packed into a single flow for this example flow_name = flow.name - all_timestamps: List = [] - for channel in data: # Extract all timestamps from all channels - all_timestamps += channel.keys() + all_timestamps = [ + ts + for channel in data + for ts in channel.keys() + ] all_timestamps = sorted(list(set(all_timestamps))) # Remove duplicates and sort for timestamp in all_timestamps: From 17fad9c0d8d1d06765b67bfca03cf390b802f384 Mon Sep 17 00:00:00 2001 From: Ailin Yu Date: Thu, 19 Sep 2024 13:23:08 -0500 Subject: [PATCH 6/6] chore: fmt --- python/examples/ingestion_with_csv/multi_csv/main.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/python/examples/ingestion_with_csv/multi_csv/main.py b/python/examples/ingestion_with_csv/multi_csv/main.py index 40d575e6..70c4b15e 100644 --- a/python/examples/ingestion_with_csv/multi_csv/main.py +++ b/python/examples/ingestion_with_csv/multi_csv/main.py @@ -20,11 +20,7 @@ def parse_csv( flow = telemetry_config.flows[0] # Packed into a single flow for this example flow_name = flow.name - all_timestamps = [ - ts - for channel in data - for ts in channel.keys() - ] + all_timestamps = [ts for channel in data for ts in channel.keys()] all_timestamps = sorted(list(set(all_timestamps))) # Remove duplicates and sort for timestamp in all_timestamps: