Skip to content

Commit

Permalink
Merge pull request #119 from openclimatefix/issue/gsp-dno-total
Browse files Browse the repository at this point in the history
Issue/gsp dno total
  • Loading branch information
peterdudfield authored Oct 20, 2023
2 parents eb53cd3 + 6fcb7cc commit 07333d8
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 131 deletions.
320 changes: 210 additions & 110 deletions poetry.lock

Large diffs are not rendered by default.

89 changes: 75 additions & 14 deletions pv_site_api/_db_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from pvsite_datamodel.read.generation import get_pv_generation_by_sites
from pvsite_datamodel.read.user import get_user_by_email
from pvsite_datamodel.sqlmodels import ForecastSQL, ForecastValueSQL, SiteGroupSiteSQL, SiteSQL
from sqlalchemy import func
from sqlalchemy.orm import Session, aliased

from .convert import (
forecast_rows_sums_to_pydantic_objects,
forecast_rows_to_pydantic,
forecast_rows_to_pydantic_compact,
generation_rows_to_pydantic,
Expand Down Expand Up @@ -49,14 +51,14 @@ def _get_forecasts_for_horizon(
start_utc: dt.datetime,
end_utc: dt.datetime,
horizon_minutes: int,
sum_by: Optional[str] = None,
) -> list[Row]:
"""Get the forecasts for given sites for a given horizon."""
stmt = (
sa.select(ForecastSQL, ForecastValueSQL)
query = (
session.query(ForecastSQL, ForecastValueSQL)
# We need a DISTINCT ON statement in cases where we have run two forecasts for the same
# time. In practice this shouldn't happen often.
.distinct(ForecastSQL.site_uuid, ForecastSQL.timestamp_utc)
.select_from(ForecastSQL)
.join(ForecastValueSQL)
.where(ForecastSQL.site_uuid.in_(site_uuids))
# Also filtering on `timestamp_utc` makes the query faster.
Expand All @@ -68,11 +70,35 @@ def _get_forecasts_for_horizon(
.order_by(ForecastSQL.site_uuid, ForecastSQL.timestamp_utc)
)

return list(session.execute(stmt))
if sum_by is not None:
subquery = query.subquery()

group_by_variables = [subquery.c.start_utc]
if sum_by == "dno":
group_by_variables.append(SiteSQL.dno)
if sum_by == "gsp":
group_by_variables.append(SiteSQL.gsp)
query_variables = group_by_variables.copy()
query_variables.append(func.sum(subquery.c.forecast_power_kw))

query = session.query(*query_variables)
query = query.join(ForecastSQL, ForecastSQL.forecast_uuid == subquery.c.forecast_uuid)
query = query.join(SiteSQL)
query = query.group_by(*group_by_variables)
query = query.order_by(*group_by_variables)
forecasts_raw = query.all()

else:
forecasts_raw = query.all()

return forecasts_raw


def _get_latest_forecast_by_sites(
session: Session, site_uuids: list[str], start_utc: Optional[dt.datetime] = None
session: Session,
site_uuids: list[str],
start_utc: Optional[dt.datetime] = None,
sum_by: Optional[str] = None,
) -> list[Row]:
"""Get the latest forecast for given site uuids."""
# Get the latest forecast for each site.
Expand All @@ -99,7 +125,27 @@ def _get_latest_forecast_by_sites(

query.order_by(forecast_subq.timestamp_utc, ForecastValueSQL.start_utc)

return query.all()
if sum_by is None:
return query.all()
else:
subquery = query.subquery()

group_by_variables = [subquery.c.start_utc]
if sum_by == "dno":
group_by_variables.append(SiteSQL.dno)
if sum_by == "gsp":
group_by_variables.append(SiteSQL.gsp)
query_variables = group_by_variables.copy()
query_variables.append(func.sum(subquery.c.forecast_power_kw))

query = session.query(*query_variables)
query = query.join(ForecastSQL, ForecastSQL.forecast_uuid == subquery.c.forecast_uuid)
query = query.join(SiteSQL)
query = query.group_by(*group_by_variables)
query = query.order_by(*group_by_variables)
forecasts_raw = query.all()

return forecasts_raw


def get_forecasts_by_sites(
Expand All @@ -108,6 +154,7 @@ def get_forecasts_by_sites(
start_utc: dt.datetime,
horizon_minutes: int,
compact: bool = False,
sum_by: Optional[str] = None,
) -> Union[list[Forecast], ManyForecastCompact]:
"""Combination of the latest forecast and the past forecasts, for given sites.
Expand All @@ -124,36 +171,50 @@ def get_forecasts_by_sites(
start_utc=start_utc,
end_utc=end_utc,
horizon_minutes=horizon_minutes,
sum_by=sum_by,
)
logger.debug("Found %s past forecasts", len(rows_past))

rows_future = _get_latest_forecast_by_sites(
session=session, site_uuids=site_uuids, start_utc=start_utc
session=session, site_uuids=site_uuids, start_utc=start_utc, sum_by=sum_by
)
logger.debug("Found %s future forecasts", len(rows_future))

logger.debug("Formatting forecasts to pydantic objects")
if compact:
forecasts = forecast_rows_to_pydantic_compact(rows_past + rows_future)
if sum_by is not None:
forecasts = forecast_rows_sums_to_pydantic_objects(rows_future + rows_past)
else:
forecasts = forecast_rows_to_pydantic(rows_past + rows_future)
logger.debug("Formatting forecasts to pydantic objects: done")
logger.debug("Formatting forecasts to pydantic objects")
if compact:
forecasts = forecast_rows_to_pydantic_compact(rows_past + rows_future)
else:
forecasts = forecast_rows_to_pydantic(rows_past + rows_future)
logger.debug("Formatting forecasts to pydantic objects: done")

return forecasts


def get_generation_by_sites(
session: Session, site_uuids: list[str], start_utc: dt.datetime, compact: bool = False
session: Session,
site_uuids: list[str],
start_utc: dt.datetime,
compact: bool = False,
sum_by: Optional[str] = None,
) -> Union[list[MultiplePVActual], MultipleSitePVActualCompact]:
"""Get the generation since yesterday (midnight) for a list of sites."""
logger.info(f"Getting generation for {len(site_uuids)} sites")
rows = get_pv_generation_by_sites(
session=session, start_utc=start_utc, site_uuids=[uuid.UUID(su) for su in site_uuids]
session=session,
start_utc=start_utc,
site_uuids=[uuid.UUID(su) for su in site_uuids],
sum_by=sum_by,
)

# Go through the rows and split the data by site.
pv_actual_values_per_site: dict[str, list[PVActualValue]] = defaultdict(list)

if sum_by is not None:
return rows

# TODO can we speed this up?
if not compact:
return generation_rows_to_pydantic(pv_actual_values_per_site, rows, site_uuids)
Expand Down
20 changes: 20 additions & 0 deletions pv_site_api/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import numpy as np
import structlog
from pvsite_datamodel.pydantic_models import ForecastValueSum

from pv_site_api.pydantic_models import (
Forecast,
Expand Down Expand Up @@ -166,3 +167,22 @@ def generation_rows_to_pydantic_compact(rows) -> MultipleSitePVActualCompact:
return MultipleSitePVActualCompact(
pv_actual_values_many_site=multiple_pv_actuals, start_utc_idx=start_utc_idx
)


def forecast_rows_sums_to_pydantic_objects(rows):
"""Convert forecast rows to a list of ForecastValueSum object.
These forecasts are summed by total, dno, or gsp in the database
"""
forecasts = []
for forecast_raw in rows:
if len(forecast_raw) == 2:
generation = ForecastValueSum(
start_utc=forecast_raw[0], power_kw=forecast_raw[1], name="total"
)
else:
generation = ForecastValueSum(
start_utc=forecast_raw[0], power_kw=forecast_raw[2], name=forecast_raw[1]
)
forecasts.append(generation)
return forecasts
20 changes: 17 additions & 3 deletions pv_site_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from fastapi.openapi.utils import get_openapi
from fastapi.responses import FileResponse, Response
from pvlib import irradiance, location, pvsystem
from pvsite_datamodel.pydantic_models import GenerationSum
from pvsite_datamodel.read.status import get_latest_status
from pvsite_datamodel.read.user import get_user_by_email
from pvsite_datamodel.sqlmodels import SiteSQL
Expand Down Expand Up @@ -319,17 +320,21 @@ def get_pv_actual(


@app.get(
"/sites/pv_actual", response_model=Union[list[MultiplePVActual], MultipleSitePVActualCompact]
"/sites/pv_actual",
response_model=Union[list[MultiplePVActual], list[GenerationSum], MultipleSitePVActualCompact],
)
@cache_response
def get_pv_actual_many_sites(
site_uuids: str,
session: Session = Depends(get_session),
sum_by: Optional[str] = None,
auth: dict = Depends(auth),
compact: bool = False,
):
"""
### Get the actual power generation for a list of sites.
sum_by: can be None, 'total', 'dno' or 'gsp'
"""
site_uuids_list = site_uuids.split(",")

Expand All @@ -341,7 +346,7 @@ def get_pv_actual_many_sites(
start_utc = get_yesterday_midnight()

return get_generation_by_sites(
session, site_uuids=site_uuids_list, start_utc=start_utc, compact=compact
session, site_uuids=site_uuids_list, start_utc=start_utc, compact=compact, sum_by=sum_by
)


Expand Down Expand Up @@ -388,10 +393,13 @@ def get_pv_forecast_many_sites(
site_uuids: str,
session: Session = Depends(get_session),
auth: dict = Depends(auth),
sum_by: Optional[str] = None,
compact: bool = False,
):
"""
### Get the forecasts for multiple sites.
sum_by: can be None, 'total', 'dno' or 'gsp'
"""

logger.info(f"Getting forecasts for {site_uuids}")
Expand All @@ -407,7 +415,12 @@ def get_pv_forecast_many_sites(
logger.debug(f"Loading forecast from {start_utc}")

forecasts = get_forecasts_by_sites(
session, site_uuids=site_uuids_list, start_utc=start_utc, horizon_minutes=0, compact=compact
session,
site_uuids=site_uuids_list,
start_utc=start_utc,
horizon_minutes=0,
compact=compact,
sum_by=sum_by,
)

return forecasts
Expand All @@ -419,6 +432,7 @@ def get_pv_estimate_clearsky(
site_uuid: str,
session: Session = Depends(get_session),
auth: dict = Depends(auth),
sum_by: Optional[str] = None,
):
"""
### Gets a estimate of AC production under a clear sky
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ packages = [{include = "pv_site_api"}]

[tool.poetry.dependencies]
python = "^3.10"
pydantic = "^1.10.5"
pydantic = "2.3"
uvicorn = {extras = ["standard"], version = "^0.20.0"}
psycopg2-binary = "^2.9.5"
sqlalchemy = "^1.4.46"
fastapi = "^0.92.0"
fastapi = "0.103.1"
httpx = "^0.23.3"
sentry-sdk = "^1.16.0"
pvlib = "^0.9.5"
structlog = "^22.3.0"
pyjwt = {extras = ["crypto"], version = "^2.6.0"}
pvsite-datamodel = "1.0.1"
pvsite-datamodel = "1.0.2"

[tool.poetry.group.dev.dependencies]
isort = "^5.12.0"
Expand Down
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def sites(db_session):
num_sites = 3
for j in range(num_sites):
site = make_site(db_session=db_session, ml_id=j + 1)
site.dno = f"test_dno_{j}"
site.gsp = f"test_gsp_{j}"

sites.append(site)
site_group.sites.append(site)
Expand Down
61 changes: 61 additions & 0 deletions tests/test_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timedelta

from freezegun import freeze_time
from pvsite_datamodel.pydantic_models import ForecastValueSum
from pvsite_datamodel.sqlmodels import SiteSQL

from pv_site_api.pydantic_models import Forecast, ManyForecastCompact
Expand Down Expand Up @@ -123,6 +124,66 @@ def test_get_forecast_many_sites_late_forecast_one_day_compact(
assert len(f.forecasts) == len(sites)


def test_get_forecast_many_sites_late_forecast_one_day_total(
db_session, client, forecast_values, sites
):
"""Test the case where the forecast stop working 1 day ago"""
site_uuids = [str(s.site_uuid) for s in sites]
site_uuids_str = ",".join(site_uuids)
one_day_from_now = datetime.utcnow() + timedelta(days=1)

with freeze_time(one_day_from_now):
resp = client.get(f"/sites/pv_forecast?site_uuids={site_uuids_str}&sum_by=total")
assert resp.status_code == 200

f = [ForecastValueSum(**x) for x in resp.json()]

# We have 10 forecasts with 11 values each.
# We should get 11 values for the latest forecast, and 9 values (all but the most recent)
# for the first prediction for each (other) forecast.
assert len(f) == 21


def test_get_forecast_many_sites_late_forecast_one_day_dno(
db_session, client, forecast_values, sites
):
"""Test the case where the forecast stop working 1 day ago"""
site_uuids = [str(s.site_uuid) for s in sites]
site_uuids_str = ",".join(site_uuids)
one_day_from_now = datetime.utcnow() + timedelta(days=1)

with freeze_time(one_day_from_now):
resp = client.get(f"/sites/pv_forecast?site_uuids={site_uuids_str}&sum_by=dno")
assert resp.status_code == 200

f = [ForecastValueSum(**x) for x in resp.json()]

# We have 10 forecasts with 11 values each.
# We should get 11 values for the latest forecast, and 9 values (all but the most recent)
# for the first prediction for each (other) forecast.
assert len(f) == len(sites) * 21


def test_get_forecast_many_sites_late_forecast_one_day_gsp(
db_session, client, forecast_values, sites
):
"""Test the case where the forecast stop working 1 day ago"""
site_uuids = [str(s.site_uuid) for s in sites]
site_uuids_str = ",".join(site_uuids)
one_day_from_now = datetime.utcnow() + timedelta(days=1)

with freeze_time(one_day_from_now):
resp = client.get(f"/sites/pv_forecast?site_uuids={site_uuids_str}&sum_by=gsp")
assert resp.status_code == 200

f = [ForecastValueSum(**x) for x in resp.json()]

# We have 10 forecasts with 11 values each.
# We should get 11 values for the latest forecast, and 9 values (all but the most recent)
# for the first prediction for each (other) forecast.
assert len(f) == len(sites) * 21


def test_get_forecast_no_data(db_session, client, sites):
site = db_session.query(SiteSQL).first()

Expand Down
Loading

0 comments on commit 07333d8

Please sign in to comment.