Skip to content

Commit

Permalink
docs to add inverters (#134)
Browse files Browse the repository at this point in the history
* docs

* Update README.md

* Delete quartz_solar_forecast/inverters/image/README directory

* mocks api

* changes ts to utc

* mock auth code

* mock access token

* auth code fixture

* inpmock

* escape

* mock inp

* test1

* process funcs

* process enphase data test

* rm redundant deps

* process pv test
  • Loading branch information
aryanbhosale authored Jun 27, 2024
1 parent 9eee595 commit 0e2b1af
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 105 deletions.
65 changes: 40 additions & 25 deletions quartz_solar_forecast/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,42 @@ def format_nwp_data(df: pd.DataFrame, nwp_source:str, site: PVSite):
)
return data_xr

def process_pv_data(live_generation_kw: pd.DataFrame, ts: pd.Timestamp, site: PVSite) -> xr.Dataset:
"""
Process PV data and create an xarray Dataset.
:param live_generation_kw: DataFrame containing live generation data, or None
:param ts: Current timestamp
:param site: PV site information
:return: xarray Dataset containing processed PV data
"""
if live_generation_kw is not None:
# get the most recent data
recent_pv_data = live_generation_kw[live_generation_kw['timestamp'] <= ts]
power_kw = np.array([np.array(recent_pv_data["power_kw"].values, dtype=np.float64)])
timestamp = recent_pv_data['timestamp'].values
else:
# make fake pv data, this is where we could add history of a pv system
power_kw = [[np.nan]]
timestamp = [ts]

da = xr.DataArray(
data=power_kw,
dims=["pv_id", "timestamp"],
coords=dict(
longitude=(["pv_id"], [site.longitude]),
latitude=(["pv_id"], [site.latitude]),
timestamp=timestamp,
pv_id=[1],
kwp=(["pv_id"], [site.capacity_kwp]),
tilt=(["pv_id"], [site.tilt]),
orientation=(["pv_id"], [site.orientation]),
),
)
da = da.to_dataset(name="generation_kw")

return da

def make_pv_data(site: PVSite, ts: pd.Timestamp) -> xr.Dataset:
"""
Make PV data by combining live data from SolarEdge or Enphase and fake PV data.
Expand All @@ -151,7 +187,8 @@ def make_pv_data(site: PVSite, ts: pd.Timestamp) -> xr.Dataset:
:param ts: the timestamp of the site
:return: The combined PV dataset in xarray form
"""
live_generation_kw = None # Initialize live_generation_kw to None
# Initialize live_generation_kw to None
live_generation_kw = None

# Check if the site has an inverter type specified
if site.inverter_type == 'solaredge':
Expand All @@ -172,29 +209,7 @@ def make_pv_data(site: PVSite, ts: pd.Timestamp) -> xr.Dataset:
# If no inverter type is specified or not recognized, set live_generation_kw to None
live_generation_kw = None

if live_generation_kw is not None:
# get the most recent data
recent_pv_data = live_generation_kw[live_generation_kw['timestamp'] <= ts]
power_kw = np.array([np.array(recent_pv_data["power_kw"].values, dtype=np.float64)])
timestamp = recent_pv_data['timestamp'].values
else:
# make fake pv data, this is where we could add history of a pv system
power_kw = [[np.nan]]
timestamp = [ts]

da = xr.DataArray(
data=power_kw,
dims=["pv_id", "timestamp"],
coords=dict(
longitude=(["pv_id"], [site.longitude]),
latitude=(["pv_id"], [site.latitude]),
timestamp=timestamp,
pv_id=[1],
kwp=(["pv_id"], [site.capacity_kwp]),
tilt=(["pv_id"], [site.tilt]),
orientation=(["pv_id"], [site.orientation]),
),
)
da = da.to_dataset(name="generation_kw")
# Process the PV data
da = process_pv_data(live_generation_kw, ts, site)

return da
59 changes: 59 additions & 0 deletions quartz_solar_forecast/inverters/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Adding an Inverter to Quartz Solar Forecast

The aim of this module is to allow users to add their inverter brands to Quartz Solar Forecast and use live data instead of the default fake data.

Quartz Solar Forecast has support for Enphase inverters as of now, and we are working on increasing support for a wide range of solar inverters.

## Important Directories & Files

```markdown
Open-Source-Quartz-Solar-Forecast/
├── example/
│ └── inverter_example.py
├── quartz_solar_forecast/
│ ├── data.py
│ ├── pydantic_models.py
│ └── inverters/
├── tests/
│ └── data/
│ └── test_make_pv_data.py
```

## What each Directory holds

1. `example/`
* `inverter_example.py`: Makes input data depending on the inverter type and compares it with the type with no data and runs the ML model along with a comparison plot using `plotly`. This is the file that you need to run in order to run the ML model. An example output with Enphase is demonstrated below:

![example_enphase_output](https://github.com/aryanbhosale/Open-Source-Quartz-Solar-Forecast/assets/36108149/7127a00e-c081-4f5e-a342-2be2e2efe00c)

2. `quartz_solar_forecast`:
* `data.py`: Contains the `make_pv_data()` function, that conditionally checks the inverter type and constructs and `xarray` dataframe
* `pydantic_models.py`: Contains the PVSite class
* `inverters/`:
* This is the directory where you'd want to create a new file among the other `<inverter_name>.py` files to add your inverter
* You will need to follow the appropriate authentication flow as mentioned in the documentation of the inverter you're trying to add
* We need the past 7 days data formatted in intervals of 5 minutes for this model. Given below is an example with Enphase

![example_enphase_data](https://github.com/aryanbhosale/Open-Source-Quartz-Solar-Forecast/assets/36108149/436c688c-2e59-4047-abfc-754acb629343)

* Once all the processing is done, make sure that your return type is of `pd.DataFrame` that has 2 colums, namely

* `timestamp`: `timestamp=datetime.fromtimestamp(interval_end_time_in_unix_epochs, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')`, and then convert the timestamp column to `pd.to_datetime`
* `power_kw`: Power in **KiloWatts.** An example is shown below with the formatted `pd.DataFrame`
![example_enphase_formatted_dataframe](https://github.com/aryanbhosale/Open-Source-Quartz-Solar-Forecast/assets/36108149/482b2f2a-e3f5-4a1a-97f1-2d322a1444d5)

3. `tests/`
* `data/`
* `test_make_pv_data.py`: Mocks the `make_pv_data()` function `data.py` file using various type of inverters and the `None` value too using `pytest`
* Run this using `pytest tests/data/test_make_pv_data.py`

## How to Setup

1. Ensure you have a Linux Machine like Ubuntu or Kali installed
2. Navigate inside the `Open-Source-Quartz-Solar-Forecast` and create a `virtual environment` by entering `python -m venv venv`
3. Activate the `virtual environment` by entering `source venv/bin/activate`
4. Install the requirements by entering `pip install -r requirements.txt` and `pip install -e .`
5. Install `plotly` by entering `pip install plotly`
6. Create a `.env` file in the root directory, i.e. `Open-Source-Quartz-Solar-Forecast`
7. Add your Solar Inverter's user credentials along with environment variables in the `.env` file, refer to the `.env.example` file for Enphase & SolarEdge credential examples
8. Run the `inverter_example.py` file by entering `python examples/inverter_example.py`
48 changes: 31 additions & 17 deletions quartz_solar_forecast/inverters/enphase.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,35 @@ def get_enphase_access_token():
return access_token


def process_enphase_data(data_json: dict, start_at: int) -> pd.DataFrame:
"""
Process the JSON data from Enphase API and convert it to a DataFrame.
:param data_json: JSON data from Enphase API
:param start_at: Start timestamp for filtering data
:return: DataFrame with processed data
"""
# Initialize an empty list to store the data
data_list = []

# Loop through the intervals and collect the data for the last week
for interval in data_json['intervals']:
end_at = interval['end_at']
if end_at >= start_at:
# Convert to UTC
timestamp = datetime.fromtimestamp(end_at, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')

# Append the data to the list
data_list.append({"timestamp": timestamp, "power_kw": interval['powr']/1000})

# Convert the list to a DataFrame
live_generation_kw = pd.DataFrame(data_list)

# Convert to datetime
live_generation_kw["timestamp"] = pd.to_datetime(live_generation_kw["timestamp"])

return live_generation_kw

def get_enphase_data(enphase_system_id: str) -> pd.DataFrame:
"""
Get live PV generation data from Enphase API v4
Expand Down Expand Up @@ -131,23 +160,8 @@ def get_enphase_data(enphase_system_id: str) -> pd.DataFrame:

# Convert the decoded data into JSON format
data_json = json.loads(decoded_data)

# Initialize an empty list to store the data
data_list = []

# Loop through the intervals and collect the data for the last 30 minutes
for interval in data_json['intervals']:
end_at = interval['end_at']
if end_at >= start_at:
timestamp = datetime.fromtimestamp(end_at, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')

# Append the data to the list
data_list.append({"timestamp": timestamp, "power_kw": interval['powr']/1000})

# Convert the list to a DataFrame
live_generation_kw = pd.DataFrame(data_list)

# Convert to UTC
live_generation_kw["timestamp"] = pd.to_datetime(live_generation_kw["timestamp"])
# Process the data using the new function
live_generation_kw = process_enphase_data(data_json, start_at)

return live_generation_kw
63 changes: 0 additions & 63 deletions tests/data/test_make_pv_data.py

This file was deleted.

66 changes: 66 additions & 0 deletions tests/data/test_process_pv_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pytest
import pandas as pd
import numpy as np
import xarray as xr
from datetime import datetime, timezone
from quartz_solar_forecast.data import process_pv_data
from quartz_solar_forecast.pydantic_models import PVSite

@pytest.fixture
def sample_site():
return PVSite(
latitude=51.75,
longitude=-1.25,
capacity_kwp=1.25,
tilt=35,
orientation=180,
inverter_type="enphase"
)

@pytest.fixture
def sample_timestamp():
timestamp = datetime.now().timestamp()
timestamp_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
return pd.to_datetime(timestamp_str)

@pytest.fixture
def sample_live_generation():
return pd.DataFrame({
'timestamp': [
pd.Timestamp('2024-06-16 10:00:00'),
pd.Timestamp('2024-06-16 10:05:00'),
pd.Timestamp('2024-06-16 10:10:00')
],
'power_kw': [0.75, 0.80, 0.78]
})

def test_process_pv_data_with_live_data(sample_site, sample_timestamp, sample_live_generation):
result = process_pv_data(sample_live_generation, sample_timestamp, sample_site)

assert isinstance(result, xr.Dataset)
assert 'generation_kw' in result.data_vars
assert set(result.coords) == {'longitude', 'latitude', 'timestamp', 'pv_id', 'kwp', 'tilt', 'orientation'}
assert result.pv_id.values.tolist() == [1]
assert result.longitude.values.tolist() == [sample_site.longitude]
assert result.latitude.values.tolist() == [sample_site.latitude]
assert result.kwp.values.tolist() == [sample_site.capacity_kwp]
assert result.tilt.values.tolist() == [sample_site.tilt]
assert result.orientation.values.tolist() == [sample_site.orientation]
assert len(result.timestamp) <= len(sample_live_generation)
assert np.all(result.timestamp.values <= sample_timestamp)

def test_process_pv_data_without_live_data(sample_site, sample_timestamp):
result = process_pv_data(None, sample_timestamp, sample_site)

assert isinstance(result, xr.Dataset)
assert 'generation_kw' in result.data_vars
assert set(result.coords) == {'longitude', 'latitude', 'timestamp', 'pv_id', 'kwp', 'tilt', 'orientation'}
assert result.pv_id.values.tolist() == [1]
assert result.longitude.values.tolist() == [sample_site.longitude]
assert result.latitude.values.tolist() == [sample_site.latitude]
assert result.kwp.values.tolist() == [sample_site.capacity_kwp]
assert result.tilt.values.tolist() == [sample_site.tilt]
assert result.orientation.values.tolist() == [sample_site.orientation]
assert len(result.timestamp) == 1
assert result.timestamp.values[0] == sample_timestamp
assert np.isnan(result.generation_kw.values[0][0])
56 changes: 56 additions & 0 deletions tests/inverters/test_process_enphase_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest
import pandas as pd
import numpy as np
from quartz_solar_forecast.inverters.enphase import process_enphase_data

@pytest.fixture
def sample_data():
return {
'system_id': 3136663,
'granularity': 'week',
'total_devices': 4,
'start_at': 1718530896,
'end_at': 1719134971,
'items': 'intervals',
'intervals': [
{'end_at': 1718531100, 'devices_reporting': 4, 'powr': 624, 'enwh': 52},
{'end_at': 1718531400, 'devices_reporting': 4, 'powr': 684, 'enwh': 57},
{'end_at': 1718531700, 'devices_reporting': 4, 'powr': 672, 'enwh': 56},
]
}

def test_process_enphase_data(sample_data):
# Set start_at to before/after the first interval
start_at = sample_data['intervals'][0]['end_at'] + 1

# Process the data
result = process_enphase_data(sample_data, start_at)

# Check if the result is a DataFrame
assert isinstance(result, pd.DataFrame)

# Check if the DataFrame has the expected columns
assert set(result.columns) == {'timestamp', 'power_kw'}

# Check if the timestamp column is of datetime type
assert pd.api.types.is_datetime64_any_dtype(result['timestamp'])

# Check if power_kw values are correctly calculated (divided by 1000)
expected_power_values = [interval['powr'] / 1000 for interval in sample_data['intervals']]
assert all(value in expected_power_values for value in result['power_kw'])

# Convert start_at to a naive UTC timestamp
start_at_timestamp = pd.Timestamp(start_at, unit='s').tz_localize('UTC').tz_convert(None)

# Check if all timestamps are after the start_at time
assert np.all(result['timestamp'] >= start_at_timestamp)

# Check if the number of rows is less than or equal to the number of intervals
assert len(result) <= len(sample_data['intervals'])

# Check if timestamps are formatted correctly
expected_timestamps = [
pd.Timestamp(interval['end_at'], unit='s').tz_localize('UTC').tz_convert(None).strftime('%Y-%m-%d %H:%M:%S')
for interval in sample_data['intervals']
]
assert all(ts.strftime('%Y-%m-%d %H:%M:%S') in expected_timestamps for ts in result['timestamp'])

0 comments on commit 0e2b1af

Please sign in to comment.