Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow use of PV system data from PV Sites database #226

Merged
merged 30 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8c8c3d5
minor tidy
dfulu Sep 21, 2023
2640b83
add pv metadata capacity and observed capacity
dfulu Sep 21, 2023
f095d57
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 21, 2023
79d5d30
add pipeline to pull data from pvsites database
dfulu Sep 21, 2023
f00de4f
add tests for pv site
dfulu Sep 22, 2023
5a244a3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 22, 2023
5d5c5ac
tidy
dfulu Sep 25, 2023
96b79cf
add PV system dropout functions
dfulu Sep 25, 2023
62e4a0f
add PV systems as option to PVNet datapipe
dfulu Sep 25, 2023
2ad499a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 25, 2023
3facd1a
add pv dropout tests
dfulu Sep 25, 2023
8960c19
Merge branch 'pv_inputs_from_database' of https://github.com/openclim…
dfulu Sep 25, 2023
522e55d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 25, 2023
62e89c3
fix tests
dfulu Sep 26, 2023
6ab54d3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 26, 2023
35a7074
assertion fix
dfulu Sep 26, 2023
a59afe3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 26, 2023
9330503
fix comma
dfulu Sep 26, 2023
180ceee
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 26, 2023
eae86d0
add pv fillna
dfulu Sep 26, 2023
46177b4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 26, 2023
301cd52
bug fix
dfulu Sep 26, 2023
9ccadeb
fix select ID function for multiprocessing
dfulu Oct 3, 2023
d539a77
Update ocf_datapipes/training/common.py
dfulu Oct 24, 2023
77ecaa8
Merge branch 'main' into pv_inputs_from_database
dfulu Oct 24, 2023
2fdbec1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 24, 2023
c583e76
clean up
dfulu Oct 24, 2023
ee6a69f
pv capacity name change
dfulu Oct 24, 2023
8bae600
Merge branch 'pv_inputs_from_database' of https://github.com/openclim…
dfulu Oct 24, 2023
2c39650
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ocf_datapipes/config/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ class PV(DataSourceMixin, StartEndDatetimeMixin, TimeResolutionMixin, XYDimensio
description="Tthe CSV files describing each PV system.",
)

pv_ml_ids: List[int] = Field(
None,
description="List of the ML IDs of the PV systems you'd like to filter to.",
)

is_live: bool = Field(
False, description="Option if to use live data from the nowcasting pv database"
)
Expand Down
3 changes: 2 additions & 1 deletion ocf_datapipes/convert/numpy/batch/pv.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def __iter__(self) -> NumpyBatch:
BatchKey.pv_t0_idx: xr_data.attrs["t0_idx"],
BatchKey.pv_ml_id: xr_data["ml_id"].values,
BatchKey.pv_id: xr_data["pv_system_id"].values.astype(np.float32),
BatchKey.pv_capacity_watt_power: xr_data["capacity_watt_power"].values,
BatchKey.pv_observed_capacity_wp: (xr_data["observed_capacity_wp"].values),
BatchKey.pv_nominal_capacity_wp: (xr_data["nominal_capacity_wp"].values),
BatchKey.pv_time_utc: datetime64_to_float(xr_data["time_utc"].values),
BatchKey.pv_latitude: xr_data["latitude"].values,
BatchKey.pv_longitude: xr_data["longitude"].values,
Expand Down
1 change: 1 addition & 0 deletions ocf_datapipes/load/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ocf_datapipes.load.gsp.gsp_national import OpenGSPNationalIterDataPipe as OpenGSPNational
from ocf_datapipes.load.nwp.providers.gfs import OpenGFSForecastIterDataPipe as OpenGFSForecast
from ocf_datapipes.load.pv.database import OpenPVFromDBIterDataPipe as OpenPVFromDB
from ocf_datapipes.load.pv.database import OpenPVFromPVSitesDBIterDataPipe as OpenPVFromPVSitesDB
from ocf_datapipes.load.pv.pv import OpenPVFromNetCDFIterDataPipe as OpenPVFromNetCDF

from .configuration import OpenConfigurationIterDataPipe as OpenConfiguration
Expand Down
105 changes: 103 additions & 2 deletions ocf_datapipes/load/pv/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np
import pandas as pd
from nowcasting_datamodel.connection import DatabaseConnection
from nowcasting_datamodel.models.base import Base_PV
from nowcasting_datamodel.models.base import Base_Forecast, Base_PV
from nowcasting_datamodel.models.pv import (
PVSystem,
PVSystemSQL,
Expand All @@ -17,6 +17,7 @@
solar_sheffield_passiv,
)
from nowcasting_datamodel.read.read_pv import get_pv_systems, get_pv_yield
from sqlalchemy import text
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe

Expand Down Expand Up @@ -75,6 +76,9 @@ def __iter__(self):
load_extra_minutes=self.load_extra_minutes,
)

# Database record is very short. Set observed max to NaN
pv_metadata["observed_capacity_watt_power"] = np.nan

# select metadata that is in pv_power
logger.debug(
f"There are currently {len(pv_metadata.index)} pv system in the metadata, "
Expand All @@ -91,7 +95,8 @@ def __iter__(self):
# Compile data into an xarray DataArray
data_xr = put_pv_data_into_an_xr_dataarray(
df_gen=pv_power,
system_capacities=pv_metadata.capacity_watt_power,
observed_system_capacities=pv_metadata.observed_capacity_watt_power,
nominal_system_capacities=pv_metadata.capacity_watt_power,
ml_id=pv_metadata.ml_id,
latitude=pv_metadata.latitude,
longitude=pv_metadata.longitude,
Expand Down Expand Up @@ -339,3 +344,99 @@ def create_empty_pv_data(
data.iloc[mask, i] = 0.0
logger.debug(f"Finished adding zeros to pv data for elevation below {sun_elevation_limit}")
return data


@functional_datapipe("open_pv_from_pvsites_db")
class OpenPVFromPVSitesDBIterDataPipe(IterDataPipe):
"""Data pipes and utils for getting PV data from pvsites database"""

def __init__(
self,
history_minutes: int = 30,
):
"""
Datapipe to get PV from pvsites database

Args:
history_minutes: How many history minutes to use
"""

super().__init__()

self.history_minutes = history_minutes
self.history_duration = pd.Timedelta(self.history_minutes, unit="minutes")

def __iter__(self):
df_metadata = get_metadata_from_pvsites_database()
df_gen = get_pv_power_from_pvsites_database(history_duration=self.history_duration)

# Database record is very short. Set observed max to NaN
df_metadata["observed_capacity_wp"] = np.nan

# Ensure systems are consistant between generation data, and metadata
common_systems = list(np.intersect1d(df_metadata.index, df_gen.columns))
df_gen = df_gen[common_systems]
df_metadata = df_metadata.loc[common_systems]

# Compile data into an xarray DataArray
xr_array = put_pv_data_into_an_xr_dataarray(
df_gen=df_gen,
observed_system_capacities=df_metadata.observed_capacity_wp,
nominal_system_capacities=df_metadata.nominal_capacity_wp,
ml_id=df_metadata.ml_id,
latitude=df_metadata.latitude,
longitude=df_metadata.longitude,
tilt=df_metadata.get("tilt"),
orientation=df_metadata.get("orientation"),
)

logger.info(f"Found {len(xr_array.ml_id)} PV systems")

while True:
yield xr_array


def get_metadata_from_pvsites_database() -> pd.DataFrame:
"""Load metadata from the pvsites database"""
# make database connection
url = os.getenv("DB_URL_PV")
db_connection = DatabaseConnection(url=url, base=Base_Forecast)

with db_connection.engine.connect() as conn:
df_sites_metadata = pd.DataFrame(conn.execute(text("SELECT * FROM sites")).fetchall())

df_sites_metadata["nominal_capacity_wp"] = df_sites_metadata["capacity_kw"] * 1000

df_sites_metadata = df_sites_metadata.set_index("site_uuid")

return df_sites_metadata


def get_pv_power_from_pvsites_database(history_duration: timedelta):
"""Load recent generation data from the pvsites database"""

# make database connection
url = os.getenv("DB_URL_PV")
db_connection = DatabaseConnection(url=url, base=Base_Forecast)

columns = "site_uuid, generation_power_kw, start_utc, end_utc"

start_time = f"{datetime.now() - history_duration}"

with db_connection.engine.connect() as conn:
df_db_raw = pd.DataFrame(
conn.execute(
text(f"SELECT {columns} FROM generation where end_utc >= '{start_time}'")
).fetchall()
)

# Reshape
df_gen = df_db_raw.pivot(index="end_utc", columns="site_uuid", values="generation_power_kw")

# Rescale from kW to W
df_gen = df_gen * 1000

# Fix data types
df_gen = df_gen.astype(np.float32)

return df_gen
106 changes: 46 additions & 60 deletions ocf_datapipes/load/pv/pv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Union
from typing import Optional, Union

import fsspec
import numpy as np
Expand Down Expand Up @@ -48,39 +48,21 @@ def __init__(
self.end_datetime = pv.end_datetime

def __iter__(self):
pv_datas_xr = []
pv_array_list = []
for i in range(len(self.pv_power_filenames)):
one_data: xr.DataArray = load_everything_into_ram(
pv_array: xr.DataArray = load_everything_into_ram(
self.pv_power_filenames[i],
self.pv_metadata_filenames[i],
start_datetime=self.start_datetime,
end_datetime=self.end_datetime,
inferred_metadata_filename=self.inferred_metadata_filenames[i],
)
pv_datas_xr.append(one_data)
pv_array_list.append(pv_array)

data = join_pv(pv_datas_xr)
pv_array = xr.concat(pv_array_list, dim="pv_system_id")

while True:
yield data


def join_pv(data_arrays: List[xr.DataArray]) -> xr.DataArray:
"""Join PV data arrays together.

Args:
data_arrays: List of PV data arrays

Returns: one data array containing all pv systems
"""

if len(data_arrays) == 1:
return data_arrays[0]

# expand each dataset to full time_utc
joined_data_array = xr.concat(data_arrays, dim="pv_system_id")

return joined_data_array
yield pv_array


def load_everything_into_ram(
Expand Down Expand Up @@ -126,9 +108,10 @@ def load_everything_into_ram(
estimated_capacities = estimated_capacities.loc[common_systems]

# Compile data into an xarray DataArray
data_in_ram = put_pv_data_into_an_xr_dataarray(
xr_array = put_pv_data_into_an_xr_dataarray(
df_gen=df_gen,
system_capacities=estimated_capacities,
observed_system_capacities=estimated_capacities,
nominal_system_capacities=df_metadata.capacity_watts,
ml_id=df_metadata.ml_id,
latitude=df_metadata.latitude,
longitude=df_metadata.longitude,
Expand All @@ -137,11 +120,11 @@ def load_everything_into_ram(
)

# Sanity checks
time_utc = pd.DatetimeIndex(data_in_ram.time_utc)
time_utc = pd.DatetimeIndex(xr_array.time_utc)
assert time_utc.is_monotonic_increasing
assert time_utc.is_unique

return data_in_ram
return xr_array


def _load_pv_generation_and_capacity(
Expand Down Expand Up @@ -241,48 +224,51 @@ def _load_pv_metadata(filename: str, inferred_filename: Optional[str] = None) ->
Shape of the returned pd.DataFrame for Passiv PV data:
Index: ss_id (Sheffield Solar ID)
Columns: llsoacd, orientation, tilt, kwp, operational_at, latitude, longitude, system_id,
ml_id
ml_id, capacity_watts
"""
_log.info(f"Loading PV metadata from {filename}")

if "passiv" in str(filename):
index_col = "ss_id"
else:
index_col = "system_id"

index_col = "ss_id" if "passiv" in str(filename) else "system_id"
df_metadata = pd.read_csv(filename, index_col=index_col)

# Maybe load inferred metadata if passiv
if inferred_filename is not None:
df_metadata = _load_inferred_metadata(filename, df_metadata)

if "Unnamed: 0" in df_metadata.columns:
df_metadata.drop(columns="Unnamed: 0", inplace=True)
# Drop if exists
df_metadata.drop(columns="Unnamed: 0", inplace=True, errors="ignore")

# Add ml_id column if not in metadata
# Add ml_id column if not in metadata already
if "ml_id" not in df_metadata.columns:
df_metadata["ml_id"] = np.nan

_log.info(f"Found {len(df_metadata)} PV systems in {filename}")
if "passiv" in str(filename):
# Add capacity in watts
df_metadata["capacity_watts"] = df_metadata.kwp * 1000
# Maybe load inferred metadata if passiv
if inferred_filename is not None:
df_metadata = _load_inferred_metadata(filename, df_metadata)
else:
# For PVOutput.org data
df_metadata["capacity_watts"] = df_metadata.system_size_watts
# Rename PVOutput.org tilt name to be simpler
# There is a second degree tilt, but this should be fine for now
if "array_tilt_degrees" in df_metadata.columns:
df_metadata["tilt"] = df_metadata["array_tilt_degrees"]

# Need to change orientation to a number if a string (i.e. SE) that PVOutput.org uses by
# default
mapping = {
"N": 0.0,
"NE": 45.0,
"E": 90.0,
"SE": 135.0,
"S": 180.0,
"SW": 225.0,
"W": 270.0,
"NW": 315.0,
}

# Any other keys other than those in the dict above are mapped to NaN
df_metadata["orientation"] = df_metadata.orientation.map(mapping)

# Rename PVOutput.org tilt name to be simpler
# There is a second degree tilt, but this should be fine for now
if "array_tilt_degrees" in df_metadata.columns:
df_metadata["tilt"] = df_metadata["array_tilt_degrees"]

# Need to change orientation to a number if a string (i.e. SE) that PVOutput.org uses by default
mapping = {
"S": 180.0,
"SE": 135.0,
"SW": 225.0,
"E": 90.0,
"W": 270.0,
"N": 0.0,
"NE": 45.0,
"NW": 315.0,
"EW": np.nan,
}
df_metadata = df_metadata.replace({"orientation": mapping})
_log.info(f"Found {len(df_metadata)} PV systems in {filename}")

return df_metadata

Expand Down
26 changes: 17 additions & 9 deletions ocf_datapipes/load/pv/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

def put_pv_data_into_an_xr_dataarray(
df_gen: pd.DataFrame,
system_capacities: pd.Series,
observed_system_capacities: pd.Series,
nominal_system_capacities: pd.Series,
ml_id: pd.Series,
longitude: pd.Series,
latitude: pd.Series,
Expand All @@ -24,7 +25,9 @@ def put_pv_data_into_an_xr_dataarray(
Args:
df_gen: pd.DataFrame where the columns are PV systems (and the column names are ints), and
the index is UTC datetime
system_capacities: The max power output of each PV system in Watts. Index is PV system IDs.
observed_system_capacities: The max power output observed in the time series for PV system
in watts. Index is PV system IDs
nominal_system_capacities: The metadata value for each PV system capacities in watts
ml_id: The `ml_id` used to identify each PV system
longitude: longitude of the locations
latitude: latitude of the locations
Expand All @@ -34,14 +37,18 @@ def put_pv_data_into_an_xr_dataarray(
# Sanity check!
system_ids = df_gen.columns
for name, series in (
("observed_system_capacities", observed_system_capacities),
("nominal_system_capacities", nominal_system_capacities),
("ml_id", ml_id),
("longitude", longitude),
("latitude", latitude),
("system_capacities", system_capacities),
("tilt", tilt),
("orientation", orientation),
):
logger.debug(f"Checking {name}")
if not np.array_equal(series.index, system_ids, equal_nan=True):
logger.debug(f"Index of {name} does not equal {system_ids}. Index is {series.index}")
assert np.array_equal(series.index, system_ids, equal_nan=True)
if (series is not None) and (not np.array_equal(series.index, system_ids)):
raise ValueError(
f"Index of {name} does not equal {system_ids}. Index is {series.index}"
)

data_array = xr.DataArray(
data=df_gen.values,
Expand All @@ -53,10 +60,11 @@ def put_pv_data_into_an_xr_dataarray(
).astype(np.float32)

data_array = data_array.assign_coords(
observed_capacity_wp=("pv_system_id", observed_system_capacities),
nominal_capacity_wp=("pv_system_id", nominal_system_capacities),
ml_id=("pv_system_id", ml_id),
longitude=("pv_system_id", longitude),
latitude=("pv_system_id", latitude),
capacity_watt_power=("pv_system_id", system_capacities),
ml_id=("pv_system_id", ml_id),
)

if tilt is not None:
Expand Down
2 changes: 1 addition & 1 deletion ocf_datapipes/select/drop_pv_sys_generating_overnight.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __iter__(self) -> xr.DataArray():
ds_night = ds.where(ds.status_daynight == "night", drop=True)

# Find relative maximum night-time generation for each system
night_time_max_gen = (ds_night / ds_night.capacity_watt_power).max(dim="time_utc")
night_time_max_gen = (ds_night / ds_night.observed_capacity_wp).max(dim="time_utc")

# Find systems above threshold
mask = night_time_max_gen > self.threshold
Expand Down
Loading