Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add elexon forecast route #350

Merged
merged 15 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ slowapi
pathy==0.10.3
fsspec
s3fs
elexonpy
1 change: 0 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ async def add_process_time_header(request: Request, call_next):
# Dependency
v0_route_solar = "/v0/solar/GB"
v0_route_system = "/v0/system/GB"

app.include_router(national_router, prefix=f"{v0_route_solar}/national")
app.include_router(gsp_router, prefix=f"{v0_route_solar}/gsp")
app.include_router(status_router, prefix=f"{v0_route_solar}")
Expand Down
119 changes: 89 additions & 30 deletions src/national.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""National API routes"""

import os
from datetime import datetime, timedelta
from typing import List, Optional, Union

import pandas as pd
import structlog
from fastapi import APIRouter, Depends, HTTPException, Request, Security
from elexonpy.api.generation_forecast_api import GenerationForecastApi
from elexonpy.api_client import ApiClient
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Security
from fastapi_auth0 import Auth0User
from nowcasting_datamodel.read.read import get_latest_forecast_for_gsps
from sqlalchemy.orm.session import Session
Expand All @@ -16,19 +20,94 @@
get_session,
get_truth_values_for_a_specific_gsp_from_database,
)
from pydantic_models import NationalForecast, NationalForecastValue, NationalYield
from pydantic_models import (
NationalForecast,
NationalForecastValue,
NationalYield,
SolarForecastResponse,
SolarForecastValue,
)
from utils import N_CALLS_PER_HOUR, filter_forecast_values, format_datetime, format_plevels, limiter

logger = structlog.stdlib.get_logger()


adjust_limit = float(os.getenv("ADJUST_MW_LIMIT", 0.0))
get_plevels = bool(os.getenv("GET_PLEVELS", True))

router = APIRouter(
tags=["National"],
)

# Initialize Elexon API client
api_client = ApiClient()
forecast_api = GenerationForecastApi(api_client)
forecast_generation_wind_and_solar_day_ahead_get = forecast_api.forecast_generation_wind_and_solar_day_ahead_get


@router.get("/elexon", summary="Get elexon Solar Forecast")
@limiter.limit(f"{N_CALLS_PER_HOUR}/hour")
def get_elexon_forecast(
request: Request,
start_datetime_utc: datetime = Query(
default=datetime.utcnow() - timedelta(days=3), description="Start date and time in UTC"
),
end_datetime_utc: datetime = Query(
default=datetime.utcnow() + timedelta(days=3), description="End date and time in UTC"
),
process_type: str = Query("Day Ahead", description="Process type"),
):
"""
Fetch elexon Solar and wind(?) forecasts from the Elexon API.

Args:
request (Request): The request object containing metadata about the HTTP request.
start_datetime_utc (datetime): The start date and time in UTC.
end_datetime_utc (datetime): The end date and time in UTC.
process_type (str): The type of process (e.g., 'Day Ahead').

Returns:
SolarForecastResponse: The forecast data wrapped in a SolarForecastResponse model.
"""

response = forecast_generation_wind_and_solar_day_ahead_get(
peterdudfield marked this conversation as resolved.
Show resolved Hide resolved
_from=start_datetime_utc.isoformat(),
to=end_datetime_utc.isoformat(),
process_type=process_type,
format="json",
)

if not response.data:
return SolarForecastResponse(data=[])

df = pd.DataFrame([item.to_dict() for item in response.data])
logger.debug("DataFrame Columns: %s", df.columns)
logger.debug("DataFrame Sample: %s", df.head())

# Filter to include only solar forecasts
solar_df = df[df["business_type"] == "Solar generation"]
logger.debug("Filtered Solar DataFrame: %s", solar_df.head())

forecast_values = []
for _, row in solar_df.iterrows():
try:
forecast_values.append(
SolarForecastValue(
timestamp=pd.to_datetime(row["publish_time"]).to_pydatetime(),
expected_power_generation_megawatts=row.get("quantity"),
)
)
except KeyError as e:
logger.error("KeyError: %s. Data: %s", str(e), row)
raise HTTPException(status_code=500, detail="Internal Server Error")
except Exception as e:
logger.error(
"Error during DataFrame to Model conversion: %s. Data: %s", str(e), row
)
raise HTTPException(status_code=500, detail="Internal Server Error")

result = SolarForecastResponse(data=forecast_values)
return result


@router.get(
"/forecast",
Expand All @@ -47,7 +126,9 @@ def get_national_forecast(
end_datetime_utc: Optional[str] = None,
creation_limit_utc: Optional[str] = None,
) -> Union[NationalForecast, List[NationalForecastValue]]:
"""Get the National Forecast
"""

Fetch national forecasts.

This route returns the most recent forecast for each _target_time_.

Expand All @@ -67,6 +148,9 @@ def get_national_forecast(
- **creation_utc_limit**: optional, only return forecasts made before this datetime.
Note you can only go 7 days back at the moment

Returns:
dict: The national forecast data.

"""
logger.debug("Get national forecasts")

Expand Down Expand Up @@ -182,35 +266,10 @@ def get_national_pvlive(

#### Parameters
- **regime**: can choose __in-day__ or __day-after__
"""

"""
logger.info(f"Get national PV Live estimates values " f"for regime {regime} for {user}")

return get_truth_values_for_a_specific_gsp_from_database(
session=session, gsp_id=0, regime=regime
)


@router.get(
"/bmrs",
response_model=dict,
# dependencies=[Depends(get_auth_implicit_scheme())],
summary="Get BMRS Forecast",
)
# @cache_response
@limiter.limit(f"{N_CALLS_PER_HOUR}/hour")
def get_bmrs_forecast(
request: Request,
# session: Session = Depends(get_session),
# user: Auth0User = Security(get_user()),
) -> dict:
"""

This route returns the most recent BMRS forecast for each _target_time_.

#### Parameters

"""
logger.debug("Get bmrs forecast")

return {"message": "This route is not yet implemented. Please check back later."}
24 changes: 23 additions & 1 deletion src/pydantic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from nowcasting_datamodel.models import Forecast, ForecastSQL, ForecastValue, Location, LocationSQL
from nowcasting_datamodel.models.utils import EnhancedBaseModel
from pydantic import Field, validator
from pydantic import BaseModel, Field, validator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -214,3 +214,25 @@ class NationalForecast(Forecast):
"""One Forecast of generation at one timestamp"""

forecast_values: List[NationalForecastValue] = Field(..., description="List of forecast values")


class SolarForecastValue(BaseModel):
"""Represents a single solar forecast entry"""

timestamp: datetime = Field(..., description="Timestamp of the forecast")
expected_power_generation_megawatts: Optional[float] = Field(
None, ge=0, description="Expected power generation in megawatts"
)

@validator("expected_power_generation_megawatts")
def result_check(cls, v):
"""Round to 2 decimal places"""
if v is not None:
return round(v, 2)
return v


class SolarForecastResponse(BaseModel):
"""Wrapper for a list of solar forecast values"""

data: List[SolarForecastValue] = Field(..., description="List of solar forecast values")
59 changes: 59 additions & 0 deletions src/tests/test_elexon_forecast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from unittest.mock import patch

import pandas as pd

from pydantic_models import BaseModel
from typing import Optional

# InsightsApiModelsResponsesResponseWithMetadata1InsightsApiModelsResponsesTransparencyDayAheadGenerationForWindAndSolar
peterdudfield marked this conversation as resolved.
Show resolved Hide resolved

API_URL = "/v0/solar/GB/national/elexon"


class MockClass(BaseModel):

publish_time: str
quantity: float
business_type: Optional[str] = "Solar generation"

def to_dict(self):
return self.__dict__


mock_data = [
MockClass(
**{
"publish_time": "2024-07-24T16:45:09+00:00",
"quantity": 0,
}
),
MockClass(
**{
"publish_time": "2024-07-24T16:45:09+00:00",
"quantity": 0,
}
),
]


class MockResponse:
def __init__(self):
self.data = mock_data


@patch("national.forecast_generation_wind_and_solar_day_ahead_get")
peterdudfield marked this conversation as resolved.
Show resolved Hide resolved
def test_get_elexon_forecast_with_data(mock_function, api_client):
mock_function.return_value = MockResponse()

response = api_client.get("/v0/solar/GB/national/elexon")
print("Response Headers:", response.headers)
# Assertions
assert response.status_code == 200
assert response.headers.get("Content-Type") == "application/json"

api_data = response.json()["data"]
assert len(api_data) == len(mock_data)
for i in range(len(api_data)):
assert api_data[i]["expected_power_generation_megawatts"] == mock_data[i].quantity
assert pd.Timestamp(api_data[i]["timestamp"]) == pd.Timestamp(mock_data[i].publish_time)

Loading