-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl.py
59 lines (36 loc) · 1.93 KB
/
etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import pandas as pd
from sqlalchemy import create_engine
# Define source and destination details
source_connection_string = "sqlite:///source.db" # Replace with your source connection string (e.g., CSV, database)
destination_connection_string = "sqlite:///destination.db" # Replace with your destination connection string (database)
source_table = "users" # Replace with your source table name
destination_table = "users" # Replace with your destination table name
# Extract data from the source
def extract_data_from_source1():
engine = create_engine(source_connection_string)
source_1 = pd.read_sql_query(f"SELECT * FROM {source_table}", engine)
return source_1
def extract_data_from_source2():
engine = create_engine(source_connection_string)
source_2 = pd.read_sql_query(f"SELECT * FROM {source_table}", engine)
return source_2
# Transform data (cleaning, calculations, etc.)
def transform_data(data_from_source1, data_from_source2):
# Perform data cleaning, standardization, calculations, etc.
transformed_data = data_from_source1.copy() # Avoid modifying the original DataFrame
transformed_data["new_column"] = data_from_source1["existing_column"] * 2 # Example transformation
return transformed_data
# Load data to the destination
def load_data(transformed_data):
# Load data to the destination using SQLAlchemy
destination_engine = create_engine(destination_connection_string)
transformed_data.to_sql(destination_table, destination_engine, index=False, if_exists='replace')
return transformed_data
def run_etl():
# Call extract, transform, and load functions in sequence
data_from_source1 = extract_data_from_source1()
data_from_source2 = extract_data_from_source2()
# ... extract from other sources if needed
transformed_data = transform_data(data_from_source1, data_from_source2, ...)
load_data(transformed_data)
print("ETL process completed successfully!")