Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python(chore): Add multiple CSV ingestion example #98

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions python/examples/ingestion_with_csv/multi_csv/channel_a.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
time,channel_a
2024-01-28T09:12:44.531252-08:00,0.22632408240649787
2024-01-28T09:12:45.531252-08:00,0.2507473982956536
2024-01-28T09:12:46.531252-08:00,0.569768335603046
2024-01-28T09:12:47.531252-08:00,0.11603978260515428
2024-01-28T09:12:48.531252-08:00,0.7769021894063354
2024-01-28T09:12:49.531252-08:00,0.925130023325051
2024-01-28T09:12:50.531252-08:00,0.5742981300587023
2024-01-28T09:12:51.531252-08:00,0.5838714673841429
2024-01-28T09:12:52.531252-08:00,0.6445476967910116
2024-01-28T09:12:53.531252-08:00,0.6168050494395134
2024-01-28T09:12:54.531252-08:00,0.7069942145640749
2024-01-28T09:12:55.531252-08:00,0.22707879474976028
2024-01-28T09:12:56.531252-08:00,0.957768388142448
2024-01-28T09:12:57.531252-08:00,0.6144049192190292
2024-01-28T09:12:58.531252-08:00,0.0873028949386201
2024-01-28T09:12:59.531252-08:00,0.3039485218345719
2024-01-28T09:13:00.531252-08:00,0.22851326936800914
2024-01-28T09:13:01.531252-08:00,0.16260281000247712
2024-01-28T09:13:02.531252-08:00,0.09552401779445857
2024-01-28T09:13:03.531252-08:00,0.9537078296985912
2024-01-28T09:13:04.531252-08:00,0.9122053160758067
2024-01-28T09:13:05.531252-08:00,0.7902955646688254
20 changes: 20 additions & 0 deletions python/examples/ingestion_with_csv/multi_csv/channel_b.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
time,channel_b
2024-01-28T09:12:45.531252-08:00,0.89109795898564
2024-01-28T09:12:46.531252-08:00,0.9309029570727708
2024-01-28T09:12:47.531252-08:00,0.41736205389867254
2024-01-28T09:12:48.531252-08:00,0.7757735001457383
2024-01-28T09:12:49.531252-08:00,0.42276349070729996
2024-01-28T09:12:50.531252-08:00,0.9705230086382642
2024-01-28T09:12:51.531252-08:00,0.734441844952279
2024-01-28T09:12:52.531252-08:00,0.1391787659779769
2024-01-28T09:12:53.531252-08:00,0.8297404689121078
2024-01-28T09:12:54.531252-08:00,0.3378370313101441
2024-01-28T09:12:56.531252-08:00,0.5828578086789715
2024-01-28T09:12:57.531252-08:00,0.40521655584043825
2024-01-28T09:12:58.531252-08:00,0.7945760115108459
2024-01-28T09:12:59.531252-08:00,0.20324861252254495
2024-01-28T09:13:00.531252-08:00,0.8613288704162841
2024-01-28T09:13:01.531252-08:00,0.3912526730822837
2024-01-28T09:13:02.531252-08:00,0.6454147210017829
2024-01-28T09:13:03.531252-08:00,0.7354322452008073
2024-01-28T09:13:04.531252-08:00,0.9664717997673006
22 changes: 22 additions & 0 deletions python/examples/ingestion_with_csv/multi_csv/channel_c.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
time,channel_c
2024-01-28T09:12:44.531252-08:00,0.3583413686283572
2024-01-28T09:12:45.531252-08:00,0.3836948472275413
2024-01-28T09:12:46.531252-08:00,0.9963335515068289
2024-01-28T09:12:47.531252-08:00,0.7539072495801836
2024-01-28T09:12:48.531252-08:00,0.7023922590155645
2024-01-28T09:12:49.531252-08:00,0.25561715849715744
2024-01-28T09:12:50.531252-08:00,0.5694787962865526
2024-01-28T09:12:52.531252-08:00,0.9598539271439543
2024-01-28T09:12:53.531252-08:00,0.8251028382570161
2024-01-28T09:12:54.531252-08:00,0.407590907943275
2024-01-28T09:12:55.531252-08:00,0.2872819025384998
2024-01-28T09:12:56.531252-08:00,0.5869616167673772
2024-01-28T09:12:57.531252-08:00,0.9568875633460442
2024-01-28T09:12:58.531252-08:00,0.703233083472776
2024-01-28T09:12:59.531252-08:00,0.7322267937013178
2024-01-28T09:13:00.531252-08:00,0.8209383644559006
2024-01-28T09:13:01.531252-08:00,0.9151461394004176
2024-01-28T09:13:02.531252-08:00,0.8486339884028036
2024-01-28T09:13:03.531252-08:00,0.1565551267309683
2024-01-28T09:13:04.531252-08:00,0.4669848760366126
2024-01-28T09:13:05.531252-08:00,0.8402548823816924
122 changes: 122 additions & 0 deletions python/examples/ingestion_with_csv/multi_csv/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import csv
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple

from dotenv import load_dotenv
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import ChannelConfig, ChannelDataType, double_value, empty_value
from sift_py.ingestion.config.telemetry import TelemetryConfig
from sift_py.ingestion.flow import FlowConfig, FlowOrderedChannelValues
from sift_py.ingestion.service import IngestionService


def parse_csv(
data: List[Dict], telemetry_config: TelemetryConfig
) -> List[FlowOrderedChannelValues]:
flows: List[FlowOrderedChannelValues] = []

flow = telemetry_config.flows[0] # Packed into a single flow for this example
flow_name = flow.name

all_timestamps = [ts for channel in data for ts in channel.keys()]
all_timestamps = sorted(list(set(all_timestamps))) # Remove duplicates and sort

for timestamp in all_timestamps:
channel_values = []

for channel in data:
channel_data = channel.get(timestamp)
# Check if there's data present for each timestamp
if channel_data:
channel_values.append(double_value(float(channel_data)))
else:
channel_values.append(empty_value())

flows.append(
{
"flow_name": flow_name,
"timestamp": timestamp,
"channel_values": channel_values,
}
)

return flows


def load_telemetry_config(
csv_paths: List[Path], asset_name: str, ingestion_client_key: str
) -> Tuple[TelemetryConfig, List[Dict]]:
channels = []
data: List[Dict] = [] # Each channel will have its own dictionary: {timestamp: value}

for path_to_csv in csv_paths:
with open(path_to_csv, "r") as csv_file:
reader = csv.reader(csv_file)
header = next(reader) # Grab header
channel_name = header[1] # Assuming only one channel per CSV

channels.append(
ChannelConfig(
name=channel_name,
data_type=ChannelDataType.DOUBLE, # Assuming all channels are doubles for this example
)
)

channel_data = {}
for row in reader:
timestamp_str, value = row[0], row[1] # Assuming only one channel per CSV
channel_data.update({datetime.fromisoformat(timestamp_str): value})
data.append(channel_data)

telemetry_config = TelemetryConfig(
asset_name=asset_name,
ingestion_client_key=ingestion_client_key,
flows=[FlowConfig(name="data", channels=channels)],
)
return telemetry_config, data


if __name__ == "__main__":
"""
Example of ingesting data from a CSV file into Sift.
"""

load_dotenv()

sift_uri = os.getenv("SIFT_API_URI")
assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set"

apikey = os.getenv("SIFT_API_KEY")
assert apikey, "expected 'SIFT_API_KEY' environment variable to be set"

asset_name = os.getenv("ASSET_NAME")
assert asset_name, "expected 'ASSET_NAME' environment variable to be set"

ingestion_client_key = os.getenv("INGESTION_CLIENT_KEY")
assert ingestion_client_key, "expected 'INGESTION_CLIENT_KEY' environment variable to be set"

csv_data = [Path("channel_a.csv"), Path("channel_b.csv"), Path("channel_c.csv")]

telemetry_config, data = load_telemetry_config(csv_data, asset_name, ingestion_client_key)
flows = parse_csv(data, telemetry_config)

sift_channel_config = SiftChannelConfig(
uri=sift_uri,
apikey=apikey,
)

with use_sift_channel(sift_channel_config) as channel:
# Create ingestion service using the telemetry config
ingestion_service = IngestionService(
channel=channel,
config=telemetry_config,
)

# Create a new run as part of this ingestion
run_name = f"{asset_name}-{datetime.now()}"
ingestion_service.attach_run(channel, run_name, "example csv ingestion")

with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.ingest_flows(*flows)
4 changes: 4 additions & 0 deletions python/examples/ingestion_with_csv/single_csv/.env-example
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SIFT_API_URI=""
SIFT_API_KEY=""
ASSET_NAME=""
INGESTION_CLIENT_KEY=""
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
python-dotenv
sift-stack-py
Loading