-
Notifications
You must be signed in to change notification settings - Fork 9
/
upload_to_snowflake.py
86 lines (68 loc) · 2.86 KB
/
upload_to_snowflake.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import os
import snowflake.connector
import pandas as pd
from datetime import datetime, timedelta
import requests
def upload_to_snowflake(file_path, stage_name, user, password, account, warehouse, database, schema, role):
conn = snowflake.connector.connect(
user=user,
password=password,
account=account,
warehouse=warehouse,
database=database,
schema=schema,
role=role
)
try:
cursor = conn.cursor()
cursor.execute(f'USE ROLE {role};')
cursor.execute(f'USE DATABASE {database};')
cursor.execute(f'USE SCHEMA {schema};')
cursor.execute(f'CREATE STAGE IF NOT EXISTS {stage_name};')
# Put the CSV file into the Snowflake stage
cursor.execute(f'PUT file://{file_path} @{stage_name} AUTO_COMPRESS=FALSE;')
print(f"Succesfully uploaded {file_path} file into stage: {database}.{schema}.{stage_name}")
except snowflake.connector.errors.ProgrammingError as e:
print(f"Snowflake ProgrammingError: {e}")
finally:
if cursor:
cursor.close()
conn.close()
def process_csv(file_path):
# Load CSV as a Pandas DataFrame
df = pd.read_csv(file_path)
# Remove duplicates based on the 'ID' column
df = df.drop_duplicates(subset=['ID'])
# Add a new column with the date in the format YYYY-MM-DD
yesterday = datetime.now() - timedelta(days=1)
df['updated_at'] = datetime.now()
# Save the DataFrame back to the CSV file
df.to_csv(file_path, index=False)
if __name__ == "__main__":
# Snowflake credentials
user=os.getenv("SNOWFLAKE_USER")
password=os.getenv("SNOWFLAKE_PASSWORD")
account=os.getenv("SNOWFLAKE_ACCOUNT")
warehouse=os.getenv("SNOWFLAKE_WAREHOUSE")
database=os.getenv("SNOWFLAKE_DATABASE")
schema=os.getenv("SNOWFLAKE_SCHEMA")
role=os.getenv("SNOWFLAKE_ROLE")
stage_name=os.getenv("SNOWFLAKE_STAGE")
# Define the file URL based on the day before the present day
yesterday = datetime.now() - timedelta(days=1)
file_url = f'https://raw.githubusercontent.com/beefsack/bgg-ranking-historicals/master/{yesterday.strftime("%Y-%m-%d")}.csv'
# Download the CSV file
response = requests.get(file_url)
if response.status_code == 200:
# Save the downloaded CSV file locally
local_file_path = f"rankings_{yesterday.strftime('%Y-%m-%d')}.csv"
with open(local_file_path, "wb") as f:
f.write(response.content)
# Process the CSV file
process_csv(local_file_path)
# Upload the processed CSV file to Snowflake
upload_to_snowflake(local_file_path, stage_name, user, password, account, warehouse, database, schema, role)
# Clean up: Remove the local file
os.remove(local_file_path)
else:
raise Exception(f"Failed to download CSV file. Status code: {response.status_code}")