From 3941713e1f157ccda6443cd6b9e9cae30f082b82 Mon Sep 17 00:00:00 2001 From: Mitchell Victoriano <47313912+MitchellAV@users.noreply.github.com> Date: Sat, 20 Jan 2024 22:46:02 -0800 Subject: [PATCH] Refactor data loading and querying function to improve performance --- solardatatools/dataio.py | 83 ++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 34 deletions(-) diff --git a/solardatatools/dataio.py b/solardatatools/dataio.py index bf180a23..f940aa2a 100644 --- a/solardatatools/dataio.py +++ b/solardatatools/dataio.py @@ -22,22 +22,7 @@ from typing import Callable, TypedDict, Any, Tuple, Dict from functools import wraps from datetime import datetime -import gzip - - -class SSHParams(TypedDict): - ssh_address_or_host: tuple[str, int] - ssh_username: str - ssh_private_key: str - remote_bind_address: tuple[str, int] - - -class DBConnectionParams(TypedDict): - database: str - user: str - password: str - host: str - port: int +import zlib def get_pvdaq_data(sysid=2, api_key="DEMO_KEY", year=2011, delim=",", standardize=True): @@ -241,14 +226,16 @@ def load_redshift_data( limit: int | None = None, verbose: bool = False, ) -> pd.DataFrame: - """Loads data based on a site id from a Redshift database into a Pandas DataFrame using an SSH tunnel + """Queries a SunPower dataset by site id and returns a Pandas DataFrame + + Request an API key by registering at https://pvdb.slacgismo.org and emailing slacgismotutorials@gmail.com with your information and use case. Parameters ---------- siteid : str site id to query api_key : str - api key for authentication to redshift + api key for authentication to query data column : str meas_name to query (default ac_power) sensor : int, list[int], optional @@ -278,10 +265,11 @@ class QueryParams(TypedDict): limit: int | None def decompress_data_to_dataframe(encoded_data): - # Decompress gzip data - decompressed_buffer = BytesIO(encoded_data) - with gzip.GzipFile(fileobj=decompressed_buffer, mode="rb") as gz: - decompressed_data = gz.read().decode("utf-8") + # Decode the data + decoded_data = base64.b64decode(encoded_data) + + # Decompress the data + decompressed_data = zlib.decompress(decoded_data).decode("utf-8") # Attempt to read the decompressed data as CSV df = pd.read_csv(StringIO(decompressed_data)) @@ -308,7 +296,7 @@ def wrapper(*args, **kwargs): def query_redshift_w_api( params: QueryParams, page: int, is_batch: bool = False ) -> requests.Response: - url = "https://lmojfukey3rylrbqughzlfu6ca0ujdby.lambda-url.us-west-1.on.aws/" + url = "https://api.pvdb.slacgismo.org/v1/query" payload = { "api_key": params.get("api_key"), "siteid": params.get("siteid"), @@ -330,7 +318,9 @@ def query_redshift_w_api( if limit is None: payload.pop("limit") - response = requests.post(url, json=payload, timeout=60 * 5) + response = requests.post( + url, json=payload, timeout=60 * 5, headers={"Accept-Encoding": "gzip"} + ) if response.status_code != 200: error = response.json() @@ -343,6 +333,40 @@ def query_redshift_w_api( return response + @timing(verbose) + def get_query_info(params: QueryParams) -> requests.Response: + url = "https://api.pvdb.slacgismo.org/v1/query/info/" + payload = { + "api_key": params.get("api_key"), + "siteid": params.get("siteid"), + "column": params.get("column"), + "sensor": params.get("sensor"), + "tmin": str(params.get("tmin")), + "tmax": str(params.get("tmax")), + "limit": str(params.get("limit")), + } + + if sensor is None: + payload.pop("sensor") + if tmin is None: + payload.pop("tmin") + if tmax is None: + payload.pop("tmax") + if limit is None: + payload.pop("limit") + + response = requests.post(url, json=payload, timeout=60 * 5) + + if response.status_code != 200: + error = response.json() + print(error) + error_msg = error["error"] + raise Exception( + f"Query failed with status code {response.status_code}: {error_msg}" + ) + + return response + def fetch_data( query_params: QueryParams, df_list: list[pd.DataFrame], index: int, page: int ): @@ -375,9 +399,7 @@ def fetch_data( } try: - batch_df: requests.Response = query_redshift_w_api( - query_params, 0, is_batch=True - ) + batch_df: requests.Response = get_query_info(query_params) data = batch_df.json() except Exception as e: raise e @@ -396,13 +418,6 @@ def fetch_data( if batches <= batch_size: loops = 1 batch_size = batches - # if limit is not None: - # if limit <= max_limit: - # loops = 1 - # batch_size = 1 - # else: - # loops = math.ceil(limit / max_limit) - # batch_size = math.ceil(batches / loops) running_count = total_count page = 0