-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
python(chore): Add multiple CSV ingestion example (#98)
- Loading branch information
Showing
10 changed files
with
193 additions
and
0 deletions.
There are no files selected for viewing
File renamed without changes.
23 changes: 23 additions & 0 deletions
23
python/examples/ingestion_with_csv/multi_csv/channel_a.csv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
time,channel_a | ||
2024-01-28T09:12:44.531252-08:00,0.22632408240649787 | ||
2024-01-28T09:12:45.531252-08:00,0.2507473982956536 | ||
2024-01-28T09:12:46.531252-08:00,0.569768335603046 | ||
2024-01-28T09:12:47.531252-08:00,0.11603978260515428 | ||
2024-01-28T09:12:48.531252-08:00,0.7769021894063354 | ||
2024-01-28T09:12:49.531252-08:00,0.925130023325051 | ||
2024-01-28T09:12:50.531252-08:00,0.5742981300587023 | ||
2024-01-28T09:12:51.531252-08:00,0.5838714673841429 | ||
2024-01-28T09:12:52.531252-08:00,0.6445476967910116 | ||
2024-01-28T09:12:53.531252-08:00,0.6168050494395134 | ||
2024-01-28T09:12:54.531252-08:00,0.7069942145640749 | ||
2024-01-28T09:12:55.531252-08:00,0.22707879474976028 | ||
2024-01-28T09:12:56.531252-08:00,0.957768388142448 | ||
2024-01-28T09:12:57.531252-08:00,0.6144049192190292 | ||
2024-01-28T09:12:58.531252-08:00,0.0873028949386201 | ||
2024-01-28T09:12:59.531252-08:00,0.3039485218345719 | ||
2024-01-28T09:13:00.531252-08:00,0.22851326936800914 | ||
2024-01-28T09:13:01.531252-08:00,0.16260281000247712 | ||
2024-01-28T09:13:02.531252-08:00,0.09552401779445857 | ||
2024-01-28T09:13:03.531252-08:00,0.9537078296985912 | ||
2024-01-28T09:13:04.531252-08:00,0.9122053160758067 | ||
2024-01-28T09:13:05.531252-08:00,0.7902955646688254 |
20 changes: 20 additions & 0 deletions
20
python/examples/ingestion_with_csv/multi_csv/channel_b.csv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
time,channel_b | ||
2024-01-28T09:12:45.531252-08:00,0.89109795898564 | ||
2024-01-28T09:12:46.531252-08:00,0.9309029570727708 | ||
2024-01-28T09:12:47.531252-08:00,0.41736205389867254 | ||
2024-01-28T09:12:48.531252-08:00,0.7757735001457383 | ||
2024-01-28T09:12:49.531252-08:00,0.42276349070729996 | ||
2024-01-28T09:12:50.531252-08:00,0.9705230086382642 | ||
2024-01-28T09:12:51.531252-08:00,0.734441844952279 | ||
2024-01-28T09:12:52.531252-08:00,0.1391787659779769 | ||
2024-01-28T09:12:53.531252-08:00,0.8297404689121078 | ||
2024-01-28T09:12:54.531252-08:00,0.3378370313101441 | ||
2024-01-28T09:12:56.531252-08:00,0.5828578086789715 | ||
2024-01-28T09:12:57.531252-08:00,0.40521655584043825 | ||
2024-01-28T09:12:58.531252-08:00,0.7945760115108459 | ||
2024-01-28T09:12:59.531252-08:00,0.20324861252254495 | ||
2024-01-28T09:13:00.531252-08:00,0.8613288704162841 | ||
2024-01-28T09:13:01.531252-08:00,0.3912526730822837 | ||
2024-01-28T09:13:02.531252-08:00,0.6454147210017829 | ||
2024-01-28T09:13:03.531252-08:00,0.7354322452008073 | ||
2024-01-28T09:13:04.531252-08:00,0.9664717997673006 |
22 changes: 22 additions & 0 deletions
22
python/examples/ingestion_with_csv/multi_csv/channel_c.csv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
time,channel_c | ||
2024-01-28T09:12:44.531252-08:00,0.3583413686283572 | ||
2024-01-28T09:12:45.531252-08:00,0.3836948472275413 | ||
2024-01-28T09:12:46.531252-08:00,0.9963335515068289 | ||
2024-01-28T09:12:47.531252-08:00,0.7539072495801836 | ||
2024-01-28T09:12:48.531252-08:00,0.7023922590155645 | ||
2024-01-28T09:12:49.531252-08:00,0.25561715849715744 | ||
2024-01-28T09:12:50.531252-08:00,0.5694787962865526 | ||
2024-01-28T09:12:52.531252-08:00,0.9598539271439543 | ||
2024-01-28T09:12:53.531252-08:00,0.8251028382570161 | ||
2024-01-28T09:12:54.531252-08:00,0.407590907943275 | ||
2024-01-28T09:12:55.531252-08:00,0.2872819025384998 | ||
2024-01-28T09:12:56.531252-08:00,0.5869616167673772 | ||
2024-01-28T09:12:57.531252-08:00,0.9568875633460442 | ||
2024-01-28T09:12:58.531252-08:00,0.703233083472776 | ||
2024-01-28T09:12:59.531252-08:00,0.7322267937013178 | ||
2024-01-28T09:13:00.531252-08:00,0.8209383644559006 | ||
2024-01-28T09:13:01.531252-08:00,0.9151461394004176 | ||
2024-01-28T09:13:02.531252-08:00,0.8486339884028036 | ||
2024-01-28T09:13:03.531252-08:00,0.1565551267309683 | ||
2024-01-28T09:13:04.531252-08:00,0.4669848760366126 | ||
2024-01-28T09:13:05.531252-08:00,0.8402548823816924 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
import csv | ||
import os | ||
from datetime import datetime | ||
from pathlib import Path | ||
from typing import Dict, List, Tuple | ||
|
||
from dotenv import load_dotenv | ||
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel | ||
from sift_py.ingestion.channel import ChannelConfig, ChannelDataType, double_value, empty_value | ||
from sift_py.ingestion.config.telemetry import TelemetryConfig | ||
from sift_py.ingestion.flow import FlowConfig, FlowOrderedChannelValues | ||
from sift_py.ingestion.service import IngestionService | ||
|
||
|
||
def parse_csv( | ||
data: List[Dict], telemetry_config: TelemetryConfig | ||
) -> List[FlowOrderedChannelValues]: | ||
flows: List[FlowOrderedChannelValues] = [] | ||
|
||
flow = telemetry_config.flows[0] # Packed into a single flow for this example | ||
flow_name = flow.name | ||
|
||
all_timestamps = [ts for channel in data for ts in channel.keys()] | ||
all_timestamps = sorted(list(set(all_timestamps))) # Remove duplicates and sort | ||
|
||
for timestamp in all_timestamps: | ||
channel_values = [] | ||
|
||
for channel in data: | ||
channel_data = channel.get(timestamp) | ||
# Check if there's data present for each timestamp | ||
if channel_data: | ||
channel_values.append(double_value(float(channel_data))) | ||
else: | ||
channel_values.append(empty_value()) | ||
|
||
flows.append( | ||
{ | ||
"flow_name": flow_name, | ||
"timestamp": timestamp, | ||
"channel_values": channel_values, | ||
} | ||
) | ||
|
||
return flows | ||
|
||
|
||
def load_telemetry_config( | ||
csv_paths: List[Path], asset_name: str, ingestion_client_key: str | ||
) -> Tuple[TelemetryConfig, List[Dict]]: | ||
channels = [] | ||
data: List[Dict] = [] # Each channel will have its own dictionary: {timestamp: value} | ||
|
||
for path_to_csv in csv_paths: | ||
with open(path_to_csv, "r") as csv_file: | ||
reader = csv.reader(csv_file) | ||
header = next(reader) # Grab header | ||
channel_name = header[1] # Assuming only one channel per CSV | ||
|
||
channels.append( | ||
ChannelConfig( | ||
name=channel_name, | ||
data_type=ChannelDataType.DOUBLE, # Assuming all channels are doubles for this example | ||
) | ||
) | ||
|
||
channel_data = {} | ||
for row in reader: | ||
timestamp_str, value = row[0], row[1] # Assuming only one channel per CSV | ||
channel_data.update({datetime.fromisoformat(timestamp_str): value}) | ||
data.append(channel_data) | ||
|
||
telemetry_config = TelemetryConfig( | ||
asset_name=asset_name, | ||
ingestion_client_key=ingestion_client_key, | ||
flows=[FlowConfig(name="data", channels=channels)], | ||
) | ||
return telemetry_config, data | ||
|
||
|
||
if __name__ == "__main__": | ||
""" | ||
Example of ingesting data from a CSV file into Sift. | ||
""" | ||
|
||
load_dotenv() | ||
|
||
sift_uri = os.getenv("SIFT_API_URI") | ||
assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set" | ||
|
||
apikey = os.getenv("SIFT_API_KEY") | ||
assert apikey, "expected 'SIFT_API_KEY' environment variable to be set" | ||
|
||
asset_name = os.getenv("ASSET_NAME") | ||
assert asset_name, "expected 'ASSET_NAME' environment variable to be set" | ||
|
||
ingestion_client_key = os.getenv("INGESTION_CLIENT_KEY") | ||
assert ingestion_client_key, "expected 'INGESTION_CLIENT_KEY' environment variable to be set" | ||
|
||
csv_data = [Path("channel_a.csv"), Path("channel_b.csv"), Path("channel_c.csv")] | ||
|
||
telemetry_config, data = load_telemetry_config(csv_data, asset_name, ingestion_client_key) | ||
flows = parse_csv(data, telemetry_config) | ||
|
||
sift_channel_config = SiftChannelConfig( | ||
uri=sift_uri, | ||
apikey=apikey, | ||
) | ||
|
||
with use_sift_channel(sift_channel_config) as channel: | ||
# Create ingestion service using the telemetry config | ||
ingestion_service = IngestionService( | ||
channel=channel, | ||
config=telemetry_config, | ||
) | ||
|
||
# Create a new run as part of this ingestion | ||
run_name = f"{asset_name}-{datetime.now()}" | ||
ingestion_service.attach_run(channel, run_name, "example csv ingestion") | ||
|
||
with ingestion_service.buffered_ingestion() as buffered_ingestion: | ||
buffered_ingestion.ingest_flows(*flows) |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
SIFT_API_URI="" | ||
SIFT_API_KEY="" | ||
ASSET_NAME="" | ||
INGESTION_CLIENT_KEY="" |
File renamed without changes.
2 changes: 2 additions & 0 deletions
2
python/examples/ingestion_with_csv/single_csv/requirements.txt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
python-dotenv | ||
sift-stack-py |
File renamed without changes.