-
Notifications
You must be signed in to change notification settings - Fork 0
/
02_etl_flow.py
62 lines (47 loc) · 1.73 KB
/
02_etl_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from datetime import datetime, timedelta
import duckdb
import pandas as pd
from prefect import flow, task
import random
@task
def extract(date_to_fetch: str):
"""Extract sample maritime transaction data from CSV file."""
try:
url = f"https://raw.githubusercontent.com/PrefectHQ/write-workflows-course/refs/heads/main/data/maritime_transactions_{date_to_fetch}.csv"
df = pd.read_csv(url)
print(f"Raw data: {df.head()}")
return df
except Exception as e:
print(f"An error occurred while extracting data: {e}")
@task
def transform(df: pd.DataFrame):
"""Transform extracted data."""
try:
rates = {"EUR": 1.1, "USD": 1.0, "GBP": 1.3, "JPY": 0.009, "CNY": 0.15}
df["amount_usd"] = df.apply(
lambda row: row["transaction_amount"] * rates[row["currency"]], axis=1
)
print(f"Transformed data: {df.head()}")
return df
except Exception as e:
print(f"An error occurred while transforming data: {e}")
@task
def load(df: pd.DataFrame):
"""Load transformed data into DuckDB database."""
try:
conn = duckdb.connect("maritime_transactions.db")
conn.execute("INSERT INTO maritime_transactions SELECT * FROM df")
conn.commit()
except Exception as e:
print(f"An error occurred while loading data: {e}")
finally:
conn.close()
@flow(log_prints=True, name="ETL Flow")
def etl(date_to_fetch: str):
"""Main flow that orchestrates the extract, transform, and load tasks."""
raw_data = extract(date_to_fetch=date_to_fetch)
transformed_data = transform(raw_data)
load(transformed_data)
print("ETL process completed.")
if __name__ == "__main__":
etl(date_to_fetch="2024-10-05")