Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add elexon forecast route #350

Merged
merged 15 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
uses: actions/checkout@v2

- name: Build Docker image
run: docker-compose -f test-docker-compose.yml build
run: docker compose -f test-docker-compose.yml build

- name: Run tests inside the container
run: docker-compose -f test-docker-compose.yml run api
run: docker compose -f test-docker-compose.yml run api

- name: Copy coverage
run: |
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ slowapi
pathy==0.10.3
fsspec
s3fs
elexonpy
1 change: 0 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ async def add_process_time_header(request: Request, call_next):
# Dependency
v0_route_solar = "/v0/solar/GB"
v0_route_system = "/v0/system/GB"

app.include_router(national_router, prefix=f"{v0_route_solar}/national")
app.include_router(gsp_router, prefix=f"{v0_route_solar}/gsp")
app.include_router(status_router, prefix=f"{v0_route_solar}")
Expand Down
104 changes: 83 additions & 21 deletions src/national.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""National API routes"""

import os
from datetime import datetime, timedelta
from typing import List, Optional, Union

import pandas as pd
import structlog
from fastapi import APIRouter, Depends, HTTPException, Request, Security
from elexonpy.api.generation_forecast_api import GenerationForecastApi
from elexonpy.api_client import ApiClient
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Security
from fastapi_auth0 import Auth0User
from nowcasting_datamodel.read.read import get_latest_forecast_for_gsps
from sqlalchemy.orm.session import Session
Expand All @@ -16,19 +20,28 @@
get_session,
get_truth_values_for_a_specific_gsp_from_database,
)
from pydantic_models import NationalForecast, NationalForecastValue, NationalYield
from pydantic_models import (
NationalForecast,
NationalForecastValue,
NationalYield,
SolarForecastResponse,
SolarForecastValue,
)
from utils import N_CALLS_PER_HOUR, filter_forecast_values, format_datetime, format_plevels, limiter

logger = structlog.stdlib.get_logger()


adjust_limit = float(os.getenv("ADJUST_MW_LIMIT", 0.0))
get_plevels = bool(os.getenv("GET_PLEVELS", True))

router = APIRouter(
tags=["National"],
)

# Initialize Elexon API client
api_client = ApiClient()
elexon_forecast_api = GenerationForecastApi(api_client)


@router.get(
"/forecast",
Expand All @@ -47,7 +60,9 @@ def get_national_forecast(
end_datetime_utc: Optional[str] = None,
creation_limit_utc: Optional[str] = None,
) -> Union[NationalForecast, List[NationalForecastValue]]:
"""Get the National Forecast
"""

Fetch national forecasts.

This route returns the most recent forecast for each _target_time_.

Expand All @@ -67,6 +82,9 @@ def get_national_forecast(
- **creation_utc_limit**: optional, only return forecasts made before this datetime.
Note you can only go 7 days back at the moment

Returns:
dict: The national forecast data.

"""
logger.debug("Get national forecasts")

Expand Down Expand Up @@ -182,35 +200,79 @@ def get_national_pvlive(

#### Parameters
- **regime**: can choose __in-day__ or __day-after__
"""

"""
logger.info(f"Get national PV Live estimates values " f"for regime {regime} for {user}")

return get_truth_values_for_a_specific_gsp_from_database(
session=session, gsp_id=0, regime=regime
)


@router.get(
"/bmrs",
response_model=dict,
# dependencies=[Depends(get_auth_implicit_scheme())],
summary="Get BMRS Forecast",
)
# @cache_response
@router.get("/elexon", summary="Get elexon Solar Forecast")
@limiter.limit(f"{N_CALLS_PER_HOUR}/hour")
def get_bmrs_forecast(
def get_elexon_forecast(
request: Request,
# session: Session = Depends(get_session),
# user: Auth0User = Security(get_user()),
) -> dict:
start_datetime_utc: datetime = Query(
default=datetime.utcnow() - timedelta(days=3), description="Start date and time in UTC"
),
end_datetime_utc: datetime = Query(
default=datetime.utcnow() + timedelta(days=3), description="End date and time in UTC"
),
process_type: str = Query("Day Ahead", description="Process type"),
):
"""
Fetch elexon Solar forecasts from the Elexon API.

This route returns the most recent BMRS forecast for each _target_time_.

#### Parameters
#### Parameters:
- **start_datetime_utc** (datetime): The start date and time in UTC.
- **end_datetime_utc** (datetime): The end date and time in UTC.
- **process_type** (str): The type of process
(e.g., 'Day Ahead', 'Intraday Process' or 'Intraday Total').

Returns:
SolarForecastResponse: The forecast data wrapped in a SolarForecastResponse model.
"""
logger.debug("Get bmrs forecast")

return {"message": "This route is not yet implemented. Please check back later."}
try:
response = elexon_forecast_api.forecast_generation_wind_and_solar_day_ahead_get(
_from=start_datetime_utc.isoformat(),
to=end_datetime_utc.isoformat(),
process_type=process_type,
format="json",
)
except Exception as e:
logger.error("Unhandled exception when collecting ELexon Data: %s", str(e))
raise HTTPException(
status_code=500, detail="Internal Server Error when collecting Elexon Data"
)

if not response.data:
return SolarForecastResponse(data=[])

df = pd.DataFrame([item.to_dict() for item in response.data])
logger.debug("DataFrame Columns: %s", df.columns)
logger.debug("DataFrame Sample: %s", df.head())

# Filter to include only solar forecasts
solar_df = df[df["business_type"] == "Solar generation"]
logger.debug("Filtered Solar DataFrame: %s", solar_df.head())

forecast_values = []
for _, row in solar_df.iterrows():
try:
forecast_values.append(
SolarForecastValue(
timestamp=pd.to_datetime(row["start_time"]).to_pydatetime(),
expected_power_generation_megawatts=row.get("quantity"),
)
)
except KeyError as e:
logger.error("KeyError: %s. Data: %s", str(e), row)
raise HTTPException(status_code=500, detail="Internal Server Error")
except Exception as e:
logger.error("Error during DataFrame to Model conversion: %s. Data: %s", str(e), row)
raise HTTPException(status_code=500, detail="Internal Server Error")

result = SolarForecastResponse(data=forecast_values)
return result
24 changes: 23 additions & 1 deletion src/pydantic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from nowcasting_datamodel.models import Forecast, ForecastSQL, ForecastValue, Location, LocationSQL
from nowcasting_datamodel.models.utils import EnhancedBaseModel
from pydantic import Field, validator
from pydantic import BaseModel, Field, validator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -214,3 +214,25 @@ class NationalForecast(Forecast):
"""One Forecast of generation at one timestamp"""

forecast_values: List[NationalForecastValue] = Field(..., description="List of forecast values")


class SolarForecastValue(BaseModel):
"""Represents a single solar forecast entry"""

timestamp: datetime = Field(..., description="Timestamp of the forecast")
expected_power_generation_megawatts: Optional[float] = Field(
None, ge=0, description="Expected power generation in megawatts"
)

@validator("expected_power_generation_megawatts")
def result_check(cls, v):
"""Round to 2 decimal places"""
if v is not None:
return round(v, 2)
return v


class SolarForecastResponse(BaseModel):
"""Wrapper for a list of solar forecast values"""

data: List[SolarForecastValue] = Field(..., description="List of solar forecast values")
69 changes: 69 additions & 0 deletions src/tests/test_elexon_forecast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Optional
from unittest.mock import MagicMock, patch

import pandas as pd
import pytest
from elexonpy.api.generation_forecast_api import GenerationForecastApi

from pydantic_models import BaseModel, SolarForecastResponse

API_URL = "/v0/solar/GB/national/elexon"


class MockClass(BaseModel):
start_time: str
quantity: float
business_type: Optional[str] = "Solar generation"

def to_dict(self):
return self.__dict__


mock_data = [
MockClass(
**{
"start_time": "2024-07-24T16:00:00+00:00",
"quantity": 0,
}
),
MockClass(
**{
"start_time": "2024-07-24T16:30:00+00:00",
"quantity": 0,
}
),
]
mock_response = MagicMock()
mock_response.data = mock_data


@patch.object(GenerationForecastApi, "forecast_generation_wind_and_solar_day_ahead_get")
def test_get_elexon_forecast_mock(mock_function, api_client):
# Set mock_response
mock_function.return_value = mock_response

# Call the API endpoint
response = api_client.get(API_URL)
print("Response Headers:", response.headers)
# Assertions
assert response.status_code == 200
assert response.headers.get("Content-Type") == "application/json"

api_data = response.json()["data"]
assert len(api_data) == len(mock_data)
for i in range(len(api_data)):
assert api_data[i]["expected_power_generation_megawatts"] == mock_data[i].quantity
assert pd.Timestamp(api_data[i]["timestamp"]) == pd.Timestamp(mock_data[i].start_time)


@pytest.mark.integration
def test_get_elexon_forecast(api_client):
response = api_client.get(API_URL)

# Assertions
assert response.status_code == 200
assert response.headers.get("Content-Type") == "application/json"

solar_forecast = SolarForecastResponse(**response.json())

assert len(solar_forecast.data) > 0
Loading