Skip to content

Commit

Permalink
Handles compressed data from lambda function
Browse files Browse the repository at this point in the history
  • Loading branch information
MitchellAV committed Nov 22, 2023
1 parent f66d548 commit be924e6
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions solardatatools/dataio.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from solardatatools.utilities import progress

from time import time
from io import StringIO
from io import StringIO, BytesIO
import base64
import os
import json
import requests
Expand All @@ -20,6 +21,7 @@
from typing import Callable, TypedDict, Any, Tuple, Dict
from functools import wraps
from datetime import datetime
import gzip


class SSHParams(TypedDict):
Expand Down Expand Up @@ -450,6 +452,23 @@ def load_redshift_data_remote(
Pandas DataFrame containing the queried data
"""

def decompress_data_to_dataframe(encoded_data):
# Decode Base64 data to bytes
# decoded_data = base64.b64decode(encoded_data)

# print("Decoded data:", decoded_data)
# Decompress gzip data
decompressed_buffer = BytesIO(encoded_data)
with gzip.GzipFile(fileobj=decompressed_buffer, mode="rb") as gz:
decompressed_data = gz.read().decode("utf-8")

# Inspect the decompressed data (optional - for debugging)

# Attempt to read the decompressed data as CSV
df = pd.read_csv(StringIO(decompressed_data))

return df

def timing(verbose: bool = True):
def decorator(func: Callable):
@wraps(func)
Expand Down Expand Up @@ -495,10 +514,8 @@ def query_redshift_w_api(page: int) -> pd.DataFrame:

print(f"Content size: {len(response.content)}")

data = response.json()
json_data = json.loads(data)

df = pd.DataFrame(json_data)
df = decompress_data_to_dataframe(response.content)
# df = pd.DataFrame(json_data)
return df

# page = 0
Expand Down Expand Up @@ -527,8 +544,6 @@ def fetch_data(df_list: list[pd.DataFrame], index: int, page: int):
if new_df.empty:
raise Exception("Empty dataframe returned from query")
print(page, len(new_df))
# print(new_df.head(10))
# print(new_df.tail(10))
df_list[index] = new_df

except Exception as e:
Expand Down

0 comments on commit be924e6

Please sign in to comment.