From 79892476aaa142509f25b4e9dec48f606344230e Mon Sep 17 00:00:00 2001 From: Aryan Bhosale <36108149+aryanbhosale@users.noreply.github.com> Date: Thu, 25 Jul 2024 15:06:27 +0530 Subject: [PATCH] solis (#162) * solis * solis api * solis inverter_example * rm solaredge * refactored * coroutines * load env * rm unnecessary funcs * dashboard update with solis * rm code * async rm * asyncio * rm async def main * rm unnecessary vars * rm unnecessary var * rm asyncio ex --- .env.example | 8 +- api/app/api.py | 1 - dashboards/dashboard_2/app.py | 51 ++- examples/example.py | 1 - examples/inverter_example.py | 4 +- quartz_solar_forecast/data.py | 39 +- quartz_solar_forecast/inverters/README.md | 2 +- quartz_solar_forecast/inverters/solaredge.py | 70 ---- quartz_solar_forecast/inverters/solis.py | 390 +++++++++++++++++++ quartz_solar_forecast/pydantic_models.py | 4 +- 10 files changed, 453 insertions(+), 117 deletions(-) delete mode 100644 quartz_solar_forecast/inverters/solaredge.py create mode 100644 quartz_solar_forecast/inverters/solis.py diff --git a/.env.example b/.env.example index 947af4e0..f181c34e 100644 --- a/.env.example +++ b/.env.example @@ -7,10 +7,12 @@ ENPHASE_API_KEY = 'user_enphase_api_key' # Replace ENPHASE_CLIENT_ID below with the actual client id AUTHORIZATION_URL = 'https://api.enphaseenergy.com/oauth/authorize?response_type=code&client_id=ENPHASE_CLIENT_ID' -# User needs to add their SolarEdge API details +# User needs to add their Solis Cloud API details -SOLAREDGE_ACCOUNT_KEY='user_solaredge_account_key' -SOLAREDGE_USER_KEY='user_solaredge_user_key' +SOLIS_CLOUD_API_KEY = 'user_solis_account_key' +SOLIS_CLOUD_API_KEY_SECRET = 'user_solis_user_key' +SOLIS_CLOUD_API_URL = 'https://www.soliscloud.com' +SOLIS_CLOUD_API_PORT = '13333' # This section is for OpenMeteo setup diff --git a/api/app/api.py b/api/app/api.py index 7324ffeb..a00b593a 100644 --- a/api/app/api.py +++ b/api/app/api.py @@ -2,7 +2,6 @@ from fastapi.middleware.cors import CORSMiddleware from quartz_solar_forecast.pydantic_models import PVSite from quartz_solar_forecast.forecast import run_forecast -from datetime import datetime app = FastAPI() diff --git a/dashboards/dashboard_2/app.py b/dashboards/dashboard_2/app.py index 6701fd7a..d6486a53 100644 --- a/dashboards/dashboard_2/app.py +++ b/dashboards/dashboard_2/app.py @@ -12,6 +12,14 @@ import json from urllib.parse import urlencode from PIL import Image +import asyncio + +from quartz_solar_forecast.pydantic_models import PVSite +from quartz_solar_forecast.forecasts import forecast_v1_tilt_orientation +from quartz_solar_forecast.forecast import predict_tryolabs +from quartz_solar_forecast.data import get_nwp, process_pv_data +from quartz_solar_forecast.inverters.enphase import process_enphase_data +from quartz_solar_forecast.inverters.solis import SolisData, get_solis_data # Load environment variables load_dotenv() @@ -23,12 +31,6 @@ # Add the parent directory to the Python path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from quartz_solar_forecast.pydantic_models import PVSite -from quartz_solar_forecast.forecasts import forecast_v1_tilt_orientation -from quartz_solar_forecast.forecast import predict_tryolabs -from quartz_solar_forecast.data import get_nwp, process_pv_data -from quartz_solar_forecast.inverters.enphase import process_enphase_data - # Get the directory of the current script script_dir = os.path.dirname(os.path.abspath(__file__)) @@ -144,11 +146,14 @@ def make_pv_data( ts: pd.Timestamp, access_token: str = None, enphase_system_id: str = None, + solis_data: pd.DataFrame = None ) -> xr.Dataset: live_generation_kw = None if site.inverter_type == "enphase" and access_token and enphase_system_id: live_generation_kw = get_enphase_data(enphase_system_id, access_token) + elif site.inverter_type == "solis" and solis_data is not None: + live_generation_kw = solis_data da = process_pv_data(live_generation_kw, ts, site) return da @@ -160,6 +165,7 @@ def predict_ocf( nwp_source: str = "icon", access_token: str = None, enphase_system_id: str = None, + solis_data: pd.DataFrame = None ): if ts is None: ts = pd.Timestamp.now().round("15min") @@ -168,7 +174,7 @@ def predict_ocf( nwp_xr = get_nwp(site=site, ts=ts, nwp_source=nwp_source) pv_xr = make_pv_data( - site=site, ts=ts, access_token=access_token, enphase_system_id=enphase_system_id + site=site, ts=ts, access_token=access_token, enphase_system_id=enphase_system_id, solis_data=solis_data ) pred_df = forecast_v1_tilt_orientation(nwp_source, nwp_xr, pv_xr, ts, model=model) @@ -181,9 +187,10 @@ def run_forecast( nwp_source: str = "icon", access_token: str = None, enphase_system_id: str = None, + solis_data: pd.DataFrame = None ) -> pd.DataFrame: if model == "gb": - return predict_ocf(site, None, ts, nwp_source, access_token, enphase_system_id) + return predict_ocf(site, None, ts, nwp_source, access_token, enphase_system_id, solis_data) elif model == "xgb": return predict_tryolabs(site, ts) else: @@ -192,7 +199,8 @@ def run_forecast( def fetch_data_and_run_forecast( site: PVSite, access_token: str = None, - enphase_system_id: str = None + enphase_system_id: str = None, + solis_data: pd.DataFrame = None ): with st.spinner("Running forecast..."): try: @@ -208,6 +216,7 @@ def fetch_data_and_run_forecast( ts=ts, access_token=access_token, enphase_system_id=enphase_system_id, + solis_data=solis_data ) # Create a site without inverter for comparison @@ -246,10 +255,11 @@ def fetch_data_and_run_forecast( longitude = st.sidebar.number_input("Longitude", min_value=-180.0, max_value=180.0, value=-1.25, step=0.01) capacity_kwp = st.sidebar.number_input("Capacity (kWp)", min_value=0.1, value=1.25, step=0.01) -inverter_type = st.sidebar.selectbox("Select Inverter", ["No Inverter", "Enphase"]) +inverter_type = st.sidebar.selectbox("Select Inverter", ["No Inverter", "Enphase", "Solis"]) access_token = None enphase_system_id = None +solis_data = None if inverter_type == "Enphase": if "access_token" not in st.session_state: @@ -258,6 +268,8 @@ def fetch_data_and_run_forecast( access_token, enphase_system_id = st.session_state["access_token"], os.getenv( "ENPHASE_SYSTEM_ID" ) +elif inverter_type == "Solis": + solis_data = SolisData() if st.sidebar.button("Run Forecast"): if inverter_type == "Enphase" and (access_token is None or enphase_system_id is None): @@ -270,12 +282,21 @@ def fetch_data_and_run_forecast( latitude=latitude, longitude=longitude, capacity_kwp=capacity_kwp, - inverter_type="enphase" if inverter_type == "Enphase" else "none" # Changed this line + inverter_type=inverter_type.lower() ) - predictions_df, ts = fetch_data_and_run_forecast( - site, access_token, enphase_system_id - ) + # Fetch data based on the selected inverter type + if inverter_type == "Enphase": + predictions_df, ts = fetch_data_and_run_forecast( + site, access_token, enphase_system_id + ) + elif inverter_type == "Solis": + solis_df = asyncio.run(get_solis_data()) + predictions_df, ts = fetch_data_and_run_forecast( + site, solis_data=solis_df + ) + else: + predictions_df, ts = fetch_data_and_run_forecast(site) if predictions_df is not None: st.success("Forecast completed successfully!") @@ -308,7 +329,7 @@ def fetch_data_and_run_forecast( y=["power_kw", "power_kw_no_live_pv"], title="Forecasted Power Generation Comparison", labels={ - "power_kw": "Forecast with selected inverter type", + "power_kw": f"Forecast with {inverter_type}", "power_kw_no_live_pv": "Forecast without recent PV data" } ) diff --git a/examples/example.py b/examples/example.py index 45f68673..2b928c7d 100644 --- a/examples/example.py +++ b/examples/example.py @@ -13,6 +13,5 @@ def main(): print(predictions_df) print(f"Max: {predictions_df['power_kw'].max()}") - if __name__ == "__main__": main() diff --git a/examples/inverter_example.py b/examples/inverter_example.py index 7d7b1ee1..af38932e 100644 --- a/examples/inverter_example.py +++ b/examples/inverter_example.py @@ -13,8 +13,8 @@ def main(save_outputs: bool = False): timestamp_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') ts = pd.to_datetime(timestamp_str) - # make input data with live enphase data - site_live = PVSite(latitude=51.75, longitude=-1.25, capacity_kwp=1.25, inverter_type="enphase") + # make input data with live enphase or solis data + site_live = PVSite(latitude=51.75, longitude=-1.25, capacity_kwp=1.25, inverter_type="solis") # inverter_type="enphase" or "solis" # make input data with nan data site_no_live = PVSite(latitude=51.75, longitude=-1.25, capacity_kwp=1.25) diff --git a/quartz_solar_forecast/data.py b/quartz_solar_forecast/data.py index 037db472..5ec54892 100644 --- a/quartz_solar_forecast/data.py +++ b/quartz_solar_forecast/data.py @@ -1,27 +1,26 @@ """ Function to get NWP data and create fake PV dataset""" -import json import ssl from datetime import datetime import os - import numpy as np import pandas as pd -import requests import xarray as xr - import openmeteo_requests import requests_cache +import asyncio + from retry_requests import retry +from typing import Optional from quartz_solar_forecast.pydantic_models import PVSite from quartz_solar_forecast.inverters.enphase import get_enphase_data -from quartz_solar_forecast.inverters.solaredge import get_site_coordinates, get_site_list, get_solaredge_data +from quartz_solar_forecast.inverters.solis import get_solis_data ssl._create_default_https_context = ssl._create_unverified_context from dotenv import load_dotenv -system_id = os.getenv('ENPHASE_SYSTEM_ID') +load_dotenv() def get_nwp(site: PVSite, ts: datetime, nwp_source: str = "icon") -> xr.Dataset: """ @@ -143,7 +142,7 @@ def format_nwp_data(df: pd.DataFrame, nwp_source:str, site: PVSite): ) return data_xr -def process_pv_data(live_generation_kw: pd.DataFrame, ts: pd.Timestamp, site: PVSite) -> xr.Dataset: +def process_pv_data(live_generation_kw: Optional[pd.DataFrame], ts: pd.Timestamp, site: PVSite) -> xr.Dataset: """ Process PV data and create an xarray Dataset. @@ -152,7 +151,7 @@ def process_pv_data(live_generation_kw: pd.DataFrame, ts: pd.Timestamp, site: PV :param site: PV site information :return: xarray Dataset containing processed PV data """ - if live_generation_kw is not None: + if live_generation_kw is not None and not live_generation_kw.empty: # get the most recent data recent_pv_data = live_generation_kw[live_generation_kw['timestamp'] <= ts] power_kw = np.array([np.array(recent_pv_data["power_kw"].values, dtype=np.float64)]) @@ -181,7 +180,7 @@ def process_pv_data(live_generation_kw: pd.DataFrame, ts: pd.Timestamp, site: PV def make_pv_data(site: PVSite, ts: pd.Timestamp) -> xr.Dataset: """ - Make PV data by combining live data from SolarEdge or Enphase and fake PV data. + Make PV data by combining live data from Enphase or Solis and fake PV data. Later we could add PV history here. :param site: the PV site :param ts: the timestamp of the site @@ -191,20 +190,16 @@ def make_pv_data(site: PVSite, ts: pd.Timestamp) -> xr.Dataset: live_generation_kw = None # Check if the site has an inverter type specified - if site.inverter_type == 'solaredge': - # Fetch the list of site IDs associated with the SolarEdge account - site_ids = get_site_list() - # Find the site ID that matches the site's latitude and longitude - matching_site_ids = [s_id for s_id in site_ids if abs(site.latitude - lat) < 1e-6 and abs(site.longitude - lon) < 1e-6 for lat, lon in get_site_coordinates(s_id)] - if not matching_site_ids: - raise ValueError("Site not found in the list of associated sites.") - elif len(matching_site_ids) > 1: - raise ValueError("Multiple sites found matching the given latitude and longitude.") + if site.inverter_type == 'enphase': + system_id = os.getenv('ENPHASE_SYSTEM_ID') + if system_id: + live_generation_kw = get_enphase_data(system_id) else: - site_id = matching_site_ids[0] - live_generation_kw = get_solaredge_data(site_id) - elif site.inverter_type == 'enphase': - live_generation_kw = get_enphase_data(system_id) + print("Error: Enphase inverter ID is not provided in the environment variables.") + elif site.inverter_type == 'solis': + live_generation_kw = asyncio.run(get_solis_data()) + if live_generation_kw is None: + print("Error: Failed to retrieve Solis inverter data.") else: # If no inverter type is specified or not recognized, set live_generation_kw to None live_generation_kw = None diff --git a/quartz_solar_forecast/inverters/README.md b/quartz_solar_forecast/inverters/README.md index 2c1d5520..dc63ac45 100644 --- a/quartz_solar_forecast/inverters/README.md +++ b/quartz_solar_forecast/inverters/README.md @@ -62,5 +62,5 @@ Open-Source-Quartz-Solar-Forecast/ 4. Install the requirements by entering `pip install -r requirements.txt` and `pip install -e .` 5. Install `plotly` by entering `pip install plotly` 6. Create a `.env` file in the root directory, i.e. `Open-Source-Quartz-Solar-Forecast` -7. Add your Solar Inverter's user credentials along with environment variables in the `.env` file, refer to the `.env.example` file for Enphase & SolarEdge credential examples +7. Add your Solar Inverter's user credentials along with environment variables in the `.env` file, refer to the `.env.example` file for Enphase & Solis credential examples 8. Run the `inverter_example.py` file by entering `python examples/inverter_example.py` diff --git a/quartz_solar_forecast/inverters/solaredge.py b/quartz_solar_forecast/inverters/solaredge.py deleted file mode 100644 index 4d945eb4..00000000 --- a/quartz_solar_forecast/inverters/solaredge.py +++ /dev/null @@ -1,70 +0,0 @@ -import requests -import os - -from dotenv import load_dotenv - -load_dotenv() - -SOLAREDGE_ACCOUNT_KEY = os.getenv('SOLAREDGE_ACCOUNT_KEY') -SOLAREDGE_USER_KEY = os.getenv('SOLAREDGE_USER_KEY') - -def get_site_coordinates(site_id: str) -> tuple[float, float]: - """ - Fetch the latitude and longitude of a SolarEdge site. - :param site_id: The site ID - :return: A tuple of (latitude, longitude) - """ - base_url = "https://monitoringapi.solaredge.com/v2" - headers = { - 'X-Account-Key': SOLAREDGE_ACCOUNT_KEY, - 'X-API-Key': SOLAREDGE_USER_KEY - } - - site_details_url = f"{base_url}/sites/{site_id}" - response = requests.get(site_details_url, headers=headers) - response.raise_for_status() - data = response.json() - - latitude = data['location']['latitude'] - longitude = data['location']['longitude'] - return latitude, longitude - -def get_site_list(): - """ - Fetch the list of sites associated with the account. - :return: A list of site IDs - """ - base_url = "https://monitoringapi.solaredge.com/v2" - headers = { - 'X-Account-Key': SOLAREDGE_ACCOUNT_KEY, - 'X-API-Key': SOLAREDGE_USER_KEY - } - - site_list_url = f"{base_url}/sites" - response = requests.get(site_list_url, headers=headers) - response.raise_for_status() - data = response.json() - - site_ids = [site['siteId'] for site in data] - return site_ids - -def get_solaredge_data(site_id: str) -> float: - """ - Get live PV generation data from the SolarEdge Monitoring API v2. - :param site_id: Site ID for the SolarEdge API - :return: Live PV generation in Watt-hours, assumed to be a floating-point number - """ - base_url = "https://monitoringapi.solaredge.com/v2" - headers = { - 'X-Account-Key': SOLAREDGE_ACCOUNT_KEY, - 'X-API-Key': SOLAREDGE_USER_KEY - } - - site_overview_url = f"{base_url}/sites/{site_id}/overview" - response = requests.get(site_overview_url, headers=headers) - response.raise_for_status() - data = response.json() - - # Extracting live generation data assuming it's in Watt-hours - live_generation_wh = data['production']['total'] - return live_generation_wh \ No newline at end of file diff --git a/quartz_solar_forecast/inverters/solis.py b/quartz_solar_forecast/inverters/solis.py new file mode 100644 index 00000000..2dbeea2c --- /dev/null +++ b/quartz_solar_forecast/inverters/solis.py @@ -0,0 +1,390 @@ +from __future__ import annotations +import asyncio +import os +import pandas as pd +from datetime import datetime, timedelta, timezone +from aiohttp import ClientSession, ClientError +import hashlib +import hmac +import base64 +import re +from enum import Enum +from http import HTTPStatus +import json +from typing import Any +import async_timeout + +from dotenv import load_dotenv + +SOLIS_CLOUD_API_URL = os.environ.get('SOLIS_CLOUD_API_URL', 'https://www.soliscloud.com') +SOLIS_CLOUD_API_PORT = os.environ.get('SOLIS_CLOUD_API_PORT', '13333') + +# VERSION +RESOURCE_PREFIX = '/v1/api/' + +VERB = "POST" + +# Endpoints +INVERTER_LIST = RESOURCE_PREFIX + 'inverterList' +INVERTER_DAY = RESOURCE_PREFIX + 'inverterDay' +class SoliscloudAPI(): + """Class with functions for reading data from the Soliscloud Portal.""" + + class SolisCloudError(Exception): + """ + Exception raised for timeouts during calls. + """ + + def __init__(self, message="SolisCloud API error"): + + self.message = message + super().__init__(self.message) + + class HttpError(SolisCloudError): + """ + Exception raised for HTTP errors during calls. + """ + + def __init__(self, statuscode, message=None): + self.statuscode = statuscode + self.message = message + if not message: + if statuscode == 408: + now = datetime.now().strftime("%d-%m-%Y %H:%M GMT") + self.message = f"Your system time is different from server time, your time is {now}" + else: + self.message = f"Http status code: {statuscode}" + super().__init__(self.message) + + class TimeoutError(SolisCloudError): + """ + Exception raised for timeouts during calls. + """ + + def __init__(self, message="Timeout error occurred"): + + self.message = message + super().__init__(self.message) + + class ApiError(SolisCloudError): + """ + Exception raised for errors during API calls. + """ + + def __init__(self, message="Undefined API error occurred", code="Unknown", response=None): + + self.message = message + self.code = code + self.response = response + super().__init__(self.message) + + def __str__(self): + return f'API returned an error: {self.message}, error code: {self.code}, response: {self.response}' + + def __init__(self, domain: str, session: ClientSession) -> None: + self._domain = domain.rstrip("/") + self._session: ClientSession = session + + class DateFormat(Enum): + DAY = 0 + MONTH = 1 + YEAR = 2 + + @property + def domain(self) -> str: + """ Domain name.""" + return self._domain + + @property + def session(self) -> ClientSession: + """ aiohttp client session ID.""" + return self._session + + # All methods take key and secret as positional arguments followed by + # one or more keyword arguments + + async def inverter_list(self, key_id: str, secret: bytes, /, *, + page_no: int = 1, + page_size: int = 20, + station_id: str = None, + nmi_code: str = None + ) -> dict[str, str]: + """Inverter list""" + + if page_size > 100: + raise SoliscloudAPI.SolisCloudError("PageSize must be <= 100") + + params: dict[str, Any] = {'pageNo': page_no, 'pageSize': page_size} + if station_id is not None: + # If not specified all inverters for all stations for key_id are returned + params['stationId'] = station_id + if nmi_code is not None: + params['nmiCode'] = nmi_code + return await self._get_records(INVERTER_LIST, key_id, secret, params) + + async def inverter_day(self, key_id: str, secret: bytes, /, *, + currency: str, + time: str, + time_zone: int, + inverter_id: int = None, + inverter_sn: str = None + ) -> dict[str, str]: + """Inverter daily graph""" + + SoliscloudAPI._verify_date(SoliscloudAPI.DateFormat.DAY, time) + params: dict[str, Any] = {'money': currency, 'time': time, 'timeZone': time_zone} + + if (inverter_id is not None and inverter_sn is None): + params['id'] = inverter_id + elif (inverter_id is None and inverter_sn is not None): + params['sn'] = inverter_sn + else: + raise SoliscloudAPI.SolisCloudError("Only pass one of inverter_id or inverter_sn \ + as identifier") + + return await self._get_data(INVERTER_DAY, key_id, secret, params) + + async def _get_records(self, canonicalized_resource: str, key_id: str, secret: bytes, params: dict[str, Any]): + """ + Return all records from call + """ + + header: dict[str, str] = SoliscloudAPI._prepare_header(key_id, secret, + params, canonicalized_resource) + + url = f"{self.domain}{canonicalized_resource}" + try: + result = await self._post_data_json(url, header, params) + return result['page']['records'] + except KeyError as err: + raise SoliscloudAPI.ApiError("Malformed data", result) from err + + async def _get_data(self, canonicalized_resource: str, key_id: str, secret: bytes, params: dict[str, Any]): + """ + Return data from call + """ + + header: dict[str, str] = SoliscloudAPI._prepare_header(key_id, secret, + params, canonicalized_resource) + + url = f"{self.domain}{canonicalized_resource}" + result = await self._post_data_json(url, header, params) + + return result + + @staticmethod + def _now() -> datetime.datetime: + return datetime.now(timezone.utc) + + @staticmethod + def _prepare_header( + key_id: str, + secret: bytes, + body: dict[str, str], + canonicalized_resource: str + ) -> dict[str, str]: + content_md5 = base64.b64encode( + hashlib.md5(json.dumps(body, separators=(",", ":")).encode('utf-8')).digest() + ).decode('utf-8') + + content_type = "application/json" + + date = SoliscloudAPI._now().strftime("%a, %d %b %Y %H:%M:%S GMT") + + encrypt_str = (VERB + "\n" + + content_md5 + "\n" + + content_type + "\n" + + date + "\n" + + canonicalized_resource + ) + hmac_obj = hmac.new( + secret, + msg=encrypt_str.encode('utf-8'), + digestmod=hashlib.sha1 + ) + sign = base64.b64encode(hmac_obj.digest()) + authorization = "API " + key_id + ":" + sign.decode('utf-8') + + header: dict[str, str] = { + "Content-MD5": content_md5, + "Content-Type": content_type, + "Date": date, + "Authorization": authorization + } + return header + + async def _post_data_json(self, + url: str, + header: dict[str, Any], + params: dict[str, Any] + ) -> dict[str, Any]: + """ Http-post data to specified domain/canonicalized_resource. """ + + resp = None + result = None + if self._session is None: + raise SoliscloudAPI.SolisCloudError("aiohttp.ClientSession not set") + try: + async with async_timeout.timeout(10): + resp = await SoliscloudAPI._do_post_aiohttp(self._session, url, params, header) + + result = await resp.json() + if resp.status == HTTPStatus.OK: + if result['code'] != '0': + raise SoliscloudAPI.ApiError(result['msg'], result['code']) + return result['data'] + else: + raise SoliscloudAPI.HttpError(resp.status) + except asyncio.TimeoutError as err: + if resp is not None: + await resp.release() + raise SoliscloudAPI.TimeoutError() from err + except ClientError as err: + if resp is not None: + await resp.release() + raise SoliscloudAPI.ApiError(err) + except (KeyError, TypeError) as err: + raise SoliscloudAPI.ApiError("Malformed server response", + response=result) from err + + @staticmethod + async def _do_post_aiohttp( + session, + url: str, + params: dict[str, Any], + header: dict[str, Any] + ) -> dict[str, Any]: + """ Allows mocking for unit tests.""" + return await session.post(url, json=params, headers=header) + + @staticmethod + def _verify_date(format: SoliscloudAPI.DateFormat, date: str): + rex = re.compile("^[0-9]{4}-[0-9]{2}-[0-9]{2}$") + err = SoliscloudAPI.SolisCloudError("time must be in format YYYY-MM-DD") + if format == SoliscloudAPI.DateFormat.MONTH: + rex = re.compile("^[0-9]{4}-[0-9]{2}$") + err = SoliscloudAPI.SolisCloudError("month must be in format YYYY-MM") + elif format == SoliscloudAPI.DateFormat.YEAR: + rex = re.compile("^[0-9]{4}$") + err = SoliscloudAPI.SolisCloudError("year must be in format YYYY") + if not rex.match(date): + raise err + return + +class SolisData: + def __init__(self, domain: str = None): + load_dotenv() + if domain is None: + domain = f"{SOLIS_CLOUD_API_URL}:{SOLIS_CLOUD_API_PORT}" + self.domain = domain + self.api_key = os.environ.get('SOLIS_CLOUD_API_KEY') + api_secret_str = os.environ.get('SOLIS_CLOUD_API_KEY_SECRET') + if not self.api_key or not api_secret_str: + raise ValueError("SOLIS_CLOUD_API_KEY or SOLIS_CLOUD_API_KEY_SECRET environment variable is not set") + self.api_secret = api_secret_str.encode('utf-8') # Convert to binary string + self.domain = domain + + async def get_inverter_list(self, soliscloud: SoliscloudAPI): + """Fetch the list of inverters""" + inverter_list = await soliscloud.inverter_list( + self.api_key, + self.api_secret, + page_no=1, + page_size=100 + ) + return inverter_list + + def process_solis_data(self, live_generation_kw: pd.DataFrame) -> pd.DataFrame: + """ + Process the Solis data and convert it to a DataFrame with timestamp and power_kw columns. + + :param live_generation_kw: DataFrame with original Solis data + :return: DataFrame with processed data + """ + # Create a copy of the DataFrame to avoid SettingWithCopyWarning + processed_df = live_generation_kw[['timestamp', 'power_kw']].copy() + + # Ensure the timestamp is in the correct format + processed_df.loc[:, 'timestamp'] = pd.to_datetime(processed_df['timestamp']) + + # Sort by timestamp + processed_df = processed_df.sort_values('timestamp') + + # Reset the index + processed_df = processed_df.reset_index(drop=True) + + return processed_df + + async def get_solis_data(self) -> pd.DataFrame: + """ + Get live PV generation data from Solis API for the last 7 days + :return: DataFrame with timestamp and power_kw columns + """ + async with ClientSession() as websession: + soliscloud = SoliscloudAPI(self.domain, websession) + + inverter_list = await self.get_inverter_list(soliscloud) + if not inverter_list: + raise ValueError("No inverters found") + + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(days=7) + + data_list = [] + + for inverter in inverter_list: + inverter_sn = inverter['sn'] + for day in range(7): + current_date = (end_time - timedelta(days=day)).strftime('%Y-%m-%d') + try: + inverter_day_data = await soliscloud.inverter_day( + self.api_key, + self.api_secret, + currency='USD', + time=current_date, + time_zone=0, + inverter_sn=inverter_sn + ) + + # Check if inverter_day_data is a list of dictionaries + if isinstance(inverter_day_data, list) and all(isinstance(item, dict) for item in inverter_day_data): + for data_point in inverter_day_data: + timestamp = datetime.fromtimestamp(int(data_point['dataTimestamp']) / 1000, tz=timezone.utc) + if start_time <= timestamp <= end_time: + data_list.append({ + "timestamp": timestamp.strftime('%Y-%m-%d %H:%M:%S'), + "power_kw": float(data_point['pac']) / 1000, # Convert W to kW + "inverter_sn": inverter_sn + }) + else: + print(f"Unexpected data format for inverter {inverter_sn} on {current_date}") + print(f"Received data: {inverter_day_data}") + + except Exception as e: + print(f"Error fetching data for inverter {inverter_sn} on {current_date}: {e}") + print(f"Received data: {inverter_day_data}") + + # Avoid rate limiting + await asyncio.sleep(0.5) # 2 times/sec limit + + # Convert the list to a DataFrame + live_generation_kw = pd.DataFrame(data_list) + + if live_generation_kw.empty: + return pd.DataFrame(columns=["timestamp", "power_kw"]) + + # Convert to datetime + live_generation_kw["timestamp"] = pd.to_datetime(live_generation_kw["timestamp"]) + + # Sort by timestamp + live_generation_kw = live_generation_kw.sort_values("timestamp") + + # Process the data to match the desired format + processed_df = self.process_solis_data(live_generation_kw) + processed_df = processed_df.reset_index(drop=True) + + return processed_df + +async def get_solis_data(): + solis_data = SolisData() + return await solis_data.get_solis_data() \ No newline at end of file diff --git a/quartz_solar_forecast/pydantic_models.py b/quartz_solar_forecast/pydantic_models.py index e122ebe5..71e801d3 100644 --- a/quartz_solar_forecast/pydantic_models.py +++ b/quartz_solar_forecast/pydantic_models.py @@ -21,6 +21,6 @@ class PVSite(BaseModel): ) inverter_type: str = Field( default=None, - description="The type of inverter used, either 'solaredge' or 'enphase'", - json_schema_extra=["solaredge", "enphase", None], + description="The type of inverter used", + json_schema_extra=["enphase", "solis", None], )