Skip to content

Commit

Permalink
Create abstract base class for inverters (#183)
Browse files Browse the repository at this point in the history
* wip

* wip

* solarman

* update token

* update inverters

* undo some changes

* add docstring

* mock inverter docstring

* remove dotenv

* update givenergy

* enphase

* try to fix tests running twice

* delete event

* pr comments

* revert workflow changes

* split inverters into separate modules

* import

* Revert "import"

This reverts commit f9f3c5f.

* Revert "split inverters into separate modules"

This reverts commit 94a9e70.

* add pydantic_settings

* set config within settings classes

* use named argument

* fix access token issue

* add a line to the docs
  • Loading branch information
mduffin95 authored Aug 28, 2024
1 parent ff7b1c8 commit 3e11e48
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 133 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ quartz_solar_forecast.egg-info
.env
venv
frontend/node_modules
frontend/.vite
frontend/.vite
__pycache__/
.cache.sqlite
93 changes: 17 additions & 76 deletions quartz_solar_forecast/data.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,19 @@
""" Function to get NWP data and create fake PV dataset"""
import ssl
from datetime import datetime, timedelta
import os
from datetime import datetime
from typing import Optional

import numpy as np
import pandas as pd
import xarray as xr
import openmeteo_requests
import pandas as pd
import requests_cache
import asyncio

import xarray as xr
from retry_requests import retry
from typing import Optional

from quartz_solar_forecast.pydantic_models import PVSite
from quartz_solar_forecast.inverters.enphase import get_enphase_data
from quartz_solar_forecast.inverters.solis import get_solis_data
from quartz_solar_forecast.inverters.givenergy import get_givenergy_data
from quartz_solar_forecast.inverters.solarman import get_solarman_data

ssl._create_default_https_context = ssl._create_unverified_context

from dotenv import load_dotenv

load_dotenv()

def get_nwp(site: PVSite, ts: datetime, nwp_source: str = "icon") -> xr.Dataset:
"""
Expand All @@ -41,13 +32,13 @@ def get_nwp(site: PVSite, ts: datetime, nwp_source: str = "icon") -> xr.Dataset:

# Define the variables we want. Visibility is handled separately after the main request
variables = [
"temperature_2m",
"precipitation",
"cloud_cover_low",
"cloud_cover_mid",
"cloud_cover_high",
"wind_speed_10m",
"shortwave_radiation",
"temperature_2m",
"precipitation",
"cloud_cover_low",
"cloud_cover_mid",
"cloud_cover_high",
"wind_speed_10m",
"shortwave_radiation",
"direct_radiation"
]

Expand All @@ -59,7 +50,7 @@ def get_nwp(site: PVSite, ts: datetime, nwp_source: str = "icon") -> xr.Dataset:
# check whether the time stamp is more than 3 months in the past
if (datetime.now() - ts).days > 90:
print("Warning: The requested timestamp is more than 3 months in the past. The weather data are provided by a reanalyse model and not ICON or GFS.")

# load data from open-meteo Historical Weather API
url = "https://archive-api.open-meteo.com/v1/archive"

Expand Down Expand Up @@ -104,7 +95,7 @@ def get_nwp(site: PVSite, ts: datetime, nwp_source: str = "icon") -> xr.Dataset:
hourly_data["dswrf"] = hourly.Variables(6).ValuesAsNumpy()
hourly_data["dlwrf"] = hourly.Variables(7).ValuesAsNumpy()

# handle visibility
# handle visibility
if (datetime.now() - ts).days <= 90:
# load data from open-meteo gfs model
params = {
Expand Down Expand Up @@ -144,61 +135,11 @@ def format_nwp_data(df: pd.DataFrame, nwp_source:str, site: PVSite):
)
return data_xr

def fetch_enphase_data() -> Optional[pd.DataFrame]:
system_id = os.getenv('ENPHASE_SYSTEM_ID')
if not system_id:
print("Error: Enphase inverter ID is not provided in the environment variables.")
return None
return get_enphase_data(system_id)

def fetch_solis_data() -> Optional[pd.DataFrame]:
try:
return asyncio.run(get_solis_data())
except Exception as e:
print(f"Error retrieving Solis data: {str(e)}")
return None

def fetch_givenergy_data() -> Optional[pd.DataFrame]:
try:
return get_givenergy_data()
except Exception as e:
print(f"Error retrieving GivEnergy data: {str(e)}")
return None

def fetch_solarman_data() -> pd.DataFrame:
try:
end_date = datetime.now()
start_date = end_date - timedelta(weeks=1)
solarman_data = get_solarman_data(start_date, end_date)

# Filter out rows with null power_kw values
valid_data = solarman_data.dropna(subset=['power_kw'])

if valid_data.empty:
print("No valid Solarman data found.")
return pd.DataFrame(columns=['timestamp', 'power_kw'])

return valid_data
except Exception as e:
print(f"Error retrieving Solarman data: {str(e)}")
return pd.DataFrame(columns=['timestamp', 'power_kw'])

def fetch_live_generation_data(inverter_type: str) -> Optional[pd.DataFrame]:
if inverter_type == 'enphase':
return fetch_enphase_data()
elif inverter_type == 'solis':
return fetch_solis_data()
elif inverter_type == 'givenergy':
return fetch_givenergy_data()
elif inverter_type == 'solarman':
return fetch_solarman_data()
else:
return pd.DataFrame(columns=['timestamp', 'power_kw'])

def process_pv_data(live_generation_kw: Optional[pd.DataFrame], ts: pd.Timestamp, site: 'PVSite') -> xr.Dataset:
"""
Process PV data and create an xarray Dataset.
:param live_generation_kw: DataFrame containing live generation data, or None
:param ts: Current timestamp
:param site: PV site information
Expand Down Expand Up @@ -231,15 +172,15 @@ def process_pv_data(live_generation_kw: Optional[pd.DataFrame], ts: pd.Timestamp

return da

def make_pv_data(site: 'PVSite', ts: pd.Timestamp) -> xr.Dataset:
def make_pv_data(site: PVSite, ts: pd.Timestamp) -> xr.Dataset:
"""
Make PV data by combining live data from various inverters.
:param site: the PV site
:param ts: the timestamp of the site
:return: The combined PV dataset in xarray form
"""
live_generation_kw = fetch_live_generation_data(site.inverter_type)
live_generation_kw = site.get_inverter().get_data(ts)
# Process the PV data
da = process_pv_data(live_generation_kw, ts, site)

Expand Down
1 change: 1 addition & 0 deletions quartz_solar_forecast/inverters/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Open-Source-Quartz-Solar-Forecast/
* `pydantic_models.py`: Contains the PVSite class
* `inverters/`:
* This is the directory where you'd want to create a new file among the other `<inverter_name>.py` files to add your inverter
* You will need to create a new inverter model that extends `AbstractInverter` which is defined in `inverter.py`
* You will need to follow the appropriate authentication flow as mentioned in the documentation of the inverter you're trying to add
* We need the past 7 days data formatted in intervals of 5 minutes for this model. Given below is an example with Enphase

Expand Down
69 changes: 51 additions & 18 deletions quartz_solar_forecast/inverters/enphase.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,50 @@
import http.client
import os
from typing import Optional

import pandas as pd
import json
import base64
from datetime import datetime, timedelta, timezone

from dotenv import load_dotenv
from urllib.parse import urlencode

from quartz_solar_forecast.inverters.inverter import AbstractInverter
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

load_dotenv()

from urllib.parse import urlencode
class EnphaseSettings(BaseSettings):
model_config = SettingsConfigDict(env_file='.env', extra='ignore')

client_id: str = Field(alias="ENPHASE_CLIENT_ID")
system_id: str = Field(alias="ENPHASE_SYSTEM_ID")
api_key: str = Field(alias="ENPHASE_API_KEY")
client_secret: str = Field(alias="ENPHASE_CLIENT_SECRET")


class EnphaseInverter(AbstractInverter):

def get_enphase_auth_url():
def __init__(self, settings: EnphaseSettings):
self.__settings = settings

def get_data(self, ts: pd.Timestamp) -> Optional[pd.DataFrame]:
return get_enphase_data(self.__settings)


def get_enphase_auth_url(settings: Optional[EnphaseSettings] = None):
"""
Generate the authorization URL for the Enphase API.
:param None
:param settings: the Enphase settings
:return: Authentication URL
"""
client_id = os.getenv('ENPHASE_CLIENT_ID')
if settings is None:
# Because this uses env variables we don't want to set it as a default argument, otherwise it will be evaluated
# even if the method is not called
settings = EnphaseSettings()

client_id = settings.client_id

redirect_uri = (
"https://api.enphaseenergy.com/oauth/redirect_uri" # Or your own redirect URI
Expand Down Expand Up @@ -51,17 +77,23 @@ def get_enphase_authorization_code(auth_url):
return code


def get_enphase_access_token(auth_code=None):
def get_enphase_access_token(auth_code: Optional[str] = None, settings: Optional[EnphaseSettings] = None):
"""
Obtain an access token for the Enphase API using the Authorization Code Grant flow.
:param auth_code: Optional authorization code. If not provided, it will be obtained.
:param settings: Optional Enphase settings
:return: Access Token
"""
client_id = os.getenv('ENPHASE_CLIENT_ID')
client_secret = os.getenv('ENPHASE_CLIENT_SECRET')
if settings is None:
# Because this uses env variables we don't want to set it as a default argument, otherwise it will be evaluated
# even if the method is not called
settings = EnphaseSettings()

client_id = settings.client_id
client_secret = settings.client_secret

if auth_code is None:
auth_url = get_enphase_auth_url()
auth_url = get_enphase_auth_url(settings)
auth_code = get_enphase_authorization_code(auth_url)

credentials = f"{client_id}:{client_secret}"
Expand Down Expand Up @@ -90,7 +122,7 @@ def get_enphase_access_token(auth_code=None):
return access_token


def process_enphase_data(data_json: dict, start_at: int) -> pd.DataFrame:
def process_enphase_data(data_json: dict, start_at: int) -> pd.DataFrame:
# Check if 'intervals' key exists in the response
if 'intervals' not in data_json:
return pd.DataFrame(columns=["timestamp", "power_kw"])
Expand All @@ -106,7 +138,7 @@ def process_enphase_data(data_json: dict, start_at: int) -> pd.DataFrame:
timestamp = datetime.fromtimestamp(end_at, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')

# Append the data to the list
data_list.append({"timestamp": timestamp, "power_kw": interval['powr']/1000})
data_list.append({"timestamp": timestamp, "power_kw": interval['powr'] / 1000})

# Convert the list to a DataFrame
live_generation_kw = pd.DataFrame(data_list)
Expand All @@ -120,18 +152,19 @@ def process_enphase_data(data_json: dict, start_at: int) -> pd.DataFrame:

return live_generation_kw

def get_enphase_data(enphase_system_id: str) -> pd.DataFrame:

def get_enphase_data(settings: EnphaseSettings) -> pd.DataFrame:
"""
Get live PV generation data from Enphase API v4
:param settings: the Enphase settings
:param enphase_system_id: System ID for Enphase API
:return: Live PV generation in Watt-hours, assumes to be a floating-point number
"""
api_key = os.getenv('ENPHASE_API_KEY')
access_token = os.getenv('ENPHASE_ACCESS_TOKEN')

# If access token is not in environment variables, get a new one
if not access_token:
access_token = get_enphase_access_token()
access_token = get_enphase_access_token(settings=settings)

# Set the start time to 1 week ago
start_at = int((datetime.now() - timedelta(weeks=1)).timestamp())
Expand All @@ -142,11 +175,11 @@ def get_enphase_data(enphase_system_id: str) -> pd.DataFrame:
conn = http.client.HTTPSConnection("api.enphaseenergy.com")
headers = {
"Authorization": f"Bearer {access_token}",
"key": api_key
"key": settings.api_key
}

# Add the system_id and duration parameters to the URL
url = f"/api/v4/systems/{enphase_system_id}/telemetry/production_micro?start_at={start_at}&granularity={granularity}"
url = f"/api/v4/systems/{settings.system_id}/telemetry/production_micro?start_at={start_at}&granularity={granularity}"
conn.request("GET", url, headers=headers)

res = conn.getresponse()
Expand All @@ -161,4 +194,4 @@ def get_enphase_data(enphase_system_id: str) -> pd.DataFrame:
# Process the data using the new function
live_generation_kw = process_enphase_data(data_json, start_at)

return live_generation_kw
return live_generation_kw
41 changes: 32 additions & 9 deletions quartz_solar_forecast/inverters/givenergy.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,41 @@
import os
from typing import Optional

import requests
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict

from quartz_solar_forecast.inverters.inverter import AbstractInverter


class GivEnergySettings(BaseSettings):
model_config = SettingsConfigDict(env_file='.env', extra='ignore')

api_key: str = Field(alias="GIVENERGY_API_KEY")


def get_inverter_serial_number():
class GivEnergyInverter(AbstractInverter):

def __init__(self, settings: GivEnergySettings):
self.__settings = settings

def get_data(self, ts: pd.Timestamp) -> Optional[pd.DataFrame]:
try:
return get_givenergy_data(self.__settings)
except Exception as e:
print(f"Error retrieving GivEnergy data: {e}")
return None


def get_inverter_serial_number(settings: GivEnergySettings):
"""
Fetch the inverter serial number from the GivEnergy communication device API.
:return: Inverter serial number as a string
"""
api_key = os.getenv('GIVENERGY_API_KEY')
api_key = settings.api_key

if not api_key:
raise ValueError("GIVENERGY_API_KEY not set in environment variables")
Expand All @@ -38,18 +60,19 @@ def get_inverter_serial_number():
inverter_serial_number = data[0]['inverter']['serial']
return inverter_serial_number

def get_givenergy_data():

def get_givenergy_data(settings: GivEnergySettings):
"""
Fetch the latest data from the GivEnergy API and return a DataFrame.
:return: DataFrame with timestamp and power_kw columns
"""
api_key = os.getenv('GIVENERGY_API_KEY')
api_key = settings.api_key

if not api_key:
raise ValueError("GIVENERGY_API_KEY not set in environment variables")

inverter_serial_number = get_inverter_serial_number()
inverter_serial_number = get_inverter_serial_number(settings)

url = f'https://api.givenergy.cloud/v1/inverter/{inverter_serial_number}/system-data/latest'

Expand Down
Loading

0 comments on commit 3e11e48

Please sign in to comment.