diff --git a/.github/workflows/test-docker.yaml b/.github/workflows/test-docker.yaml index cb07a9a..54ecf20 100644 --- a/.github/workflows/test-docker.yaml +++ b/.github/workflows/test-docker.yaml @@ -15,10 +15,10 @@ jobs: uses: actions/checkout@v2 - name: Build Docker image - run: docker-compose -f test-docker-compose.yml build + run: docker compose -f test-docker-compose.yml build - name: Run tests inside the container - run: docker-compose -f test-docker-compose.yml run api + run: docker compose -f test-docker-compose.yml run api - name: Copy coverage run: | diff --git a/requirements.txt b/requirements.txt index b5c23b3..979d07d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,4 @@ slowapi pathy==0.10.3 fsspec s3fs +elexonpy diff --git a/src/main.py b/src/main.py index d401d35..08ed289 100644 --- a/src/main.py +++ b/src/main.py @@ -208,7 +208,6 @@ async def add_process_time_header(request: Request, call_next): # Dependency v0_route_solar = "/v0/solar/GB" v0_route_system = "/v0/system/GB" - app.include_router(national_router, prefix=f"{v0_route_solar}/national") app.include_router(gsp_router, prefix=f"{v0_route_solar}/gsp") app.include_router(status_router, prefix=f"{v0_route_solar}") diff --git a/src/national.py b/src/national.py index b73da1f..b9c0d5f 100644 --- a/src/national.py +++ b/src/national.py @@ -1,10 +1,14 @@ """National API routes""" import os +from datetime import datetime, timedelta from typing import List, Optional, Union +import pandas as pd import structlog -from fastapi import APIRouter, Depends, HTTPException, Request, Security +from elexonpy.api.generation_forecast_api import GenerationForecastApi +from elexonpy.api_client import ApiClient +from fastapi import APIRouter, Depends, HTTPException, Query, Request, Security from fastapi_auth0 import Auth0User from nowcasting_datamodel.read.read import get_latest_forecast_for_gsps from sqlalchemy.orm.session import Session @@ -16,12 +20,17 @@ get_session, get_truth_values_for_a_specific_gsp_from_database, ) -from pydantic_models import NationalForecast, NationalForecastValue, NationalYield +from pydantic_models import ( + NationalForecast, + NationalForecastValue, + NationalYield, + SolarForecastResponse, + SolarForecastValue, +) from utils import N_CALLS_PER_HOUR, filter_forecast_values, format_datetime, format_plevels, limiter logger = structlog.stdlib.get_logger() - adjust_limit = float(os.getenv("ADJUST_MW_LIMIT", 0.0)) get_plevels = bool(os.getenv("GET_PLEVELS", True)) @@ -29,6 +38,10 @@ tags=["National"], ) +# Initialize Elexon API client +api_client = ApiClient() +elexon_forecast_api = GenerationForecastApi(api_client) + @router.get( "/forecast", @@ -47,7 +60,9 @@ def get_national_forecast( end_datetime_utc: Optional[str] = None, creation_limit_utc: Optional[str] = None, ) -> Union[NationalForecast, List[NationalForecastValue]]: - """Get the National Forecast + """ + + Fetch national forecasts. This route returns the most recent forecast for each _target_time_. @@ -67,6 +82,9 @@ def get_national_forecast( - **creation_utc_limit**: optional, only return forecasts made before this datetime. Note you can only go 7 days back at the moment + Returns: + dict: The national forecast data. + """ logger.debug("Get national forecasts") @@ -182,8 +200,8 @@ def get_national_pvlive( #### Parameters - **regime**: can choose __in-day__ or __day-after__ - """ + """ logger.info(f"Get national PV Live estimates values " f"for regime {regime} for {user}") return get_truth_values_for_a_specific_gsp_from_database( @@ -191,26 +209,70 @@ def get_national_pvlive( ) -@router.get( - "/bmrs", - response_model=dict, - # dependencies=[Depends(get_auth_implicit_scheme())], - summary="Get BMRS Forecast", -) -# @cache_response +@router.get("/elexon", summary="Get elexon Solar Forecast") @limiter.limit(f"{N_CALLS_PER_HOUR}/hour") -def get_bmrs_forecast( +def get_elexon_forecast( request: Request, - # session: Session = Depends(get_session), - # user: Auth0User = Security(get_user()), -) -> dict: + start_datetime_utc: datetime = Query( + default=datetime.utcnow() - timedelta(days=3), description="Start date and time in UTC" + ), + end_datetime_utc: datetime = Query( + default=datetime.utcnow() + timedelta(days=3), description="End date and time in UTC" + ), + process_type: str = Query("Day Ahead", description="Process type"), +): """ + Fetch elexon Solar forecasts from the Elexon API. - This route returns the most recent BMRS forecast for each _target_time_. - - #### Parameters + #### Parameters: + - **start_datetime_utc** (datetime): The start date and time in UTC. + - **end_datetime_utc** (datetime): The end date and time in UTC. + - **process_type** (str): The type of process + (e.g., 'Day Ahead', 'Intraday Process' or 'Intraday Total'). + Returns: + SolarForecastResponse: The forecast data wrapped in a SolarForecastResponse model. """ - logger.debug("Get bmrs forecast") - return {"message": "This route is not yet implemented. Please check back later."} + try: + response = elexon_forecast_api.forecast_generation_wind_and_solar_day_ahead_get( + _from=start_datetime_utc.isoformat(), + to=end_datetime_utc.isoformat(), + process_type=process_type, + format="json", + ) + except Exception as e: + logger.error("Unhandled exception when collecting ELexon Data: %s", str(e)) + raise HTTPException( + status_code=500, detail="Internal Server Error when collecting Elexon Data" + ) + + if not response.data: + return SolarForecastResponse(data=[]) + + df = pd.DataFrame([item.to_dict() for item in response.data]) + logger.debug("DataFrame Columns: %s", df.columns) + logger.debug("DataFrame Sample: %s", df.head()) + + # Filter to include only solar forecasts + solar_df = df[df["business_type"] == "Solar generation"] + logger.debug("Filtered Solar DataFrame: %s", solar_df.head()) + + forecast_values = [] + for _, row in solar_df.iterrows(): + try: + forecast_values.append( + SolarForecastValue( + timestamp=pd.to_datetime(row["start_time"]).to_pydatetime(), + expected_power_generation_megawatts=row.get("quantity"), + ) + ) + except KeyError as e: + logger.error("KeyError: %s. Data: %s", str(e), row) + raise HTTPException(status_code=500, detail="Internal Server Error") + except Exception as e: + logger.error("Error during DataFrame to Model conversion: %s. Data: %s", str(e), row) + raise HTTPException(status_code=500, detail="Internal Server Error") + + result = SolarForecastResponse(data=forecast_values) + return result diff --git a/src/pydantic_models.py b/src/pydantic_models.py index ca01033..10f9f64 100644 --- a/src/pydantic_models.py +++ b/src/pydantic_models.py @@ -7,7 +7,7 @@ from nowcasting_datamodel.models import Forecast, ForecastSQL, ForecastValue, Location, LocationSQL from nowcasting_datamodel.models.utils import EnhancedBaseModel -from pydantic import Field, validator +from pydantic import BaseModel, Field, validator logger = logging.getLogger(__name__) @@ -214,3 +214,25 @@ class NationalForecast(Forecast): """One Forecast of generation at one timestamp""" forecast_values: List[NationalForecastValue] = Field(..., description="List of forecast values") + + +class SolarForecastValue(BaseModel): + """Represents a single solar forecast entry""" + + timestamp: datetime = Field(..., description="Timestamp of the forecast") + expected_power_generation_megawatts: Optional[float] = Field( + None, ge=0, description="Expected power generation in megawatts" + ) + + @validator("expected_power_generation_megawatts") + def result_check(cls, v): + """Round to 2 decimal places""" + if v is not None: + return round(v, 2) + return v + + +class SolarForecastResponse(BaseModel): + """Wrapper for a list of solar forecast values""" + + data: List[SolarForecastValue] = Field(..., description="List of solar forecast values") diff --git a/src/tests/test_elexon_forecast.py b/src/tests/test_elexon_forecast.py new file mode 100644 index 0000000..c0e4f25 --- /dev/null +++ b/src/tests/test_elexon_forecast.py @@ -0,0 +1,69 @@ +from typing import Optional +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytest +from elexonpy.api.generation_forecast_api import GenerationForecastApi + +from pydantic_models import BaseModel, SolarForecastResponse + +API_URL = "/v0/solar/GB/national/elexon" + + +class MockClass(BaseModel): + start_time: str + quantity: float + business_type: Optional[str] = "Solar generation" + + def to_dict(self): + return self.__dict__ + + +mock_data = [ + MockClass( + **{ + "start_time": "2024-07-24T16:00:00+00:00", + "quantity": 0, + } + ), + MockClass( + **{ + "start_time": "2024-07-24T16:30:00+00:00", + "quantity": 0, + } + ), +] +mock_response = MagicMock() +mock_response.data = mock_data + + +@patch.object(GenerationForecastApi, "forecast_generation_wind_and_solar_day_ahead_get") +def test_get_elexon_forecast_mock(mock_function, api_client): + # Set mock_response + mock_function.return_value = mock_response + + # Call the API endpoint + response = api_client.get(API_URL) + print("Response Headers:", response.headers) + # Assertions + assert response.status_code == 200 + assert response.headers.get("Content-Type") == "application/json" + + api_data = response.json()["data"] + assert len(api_data) == len(mock_data) + for i in range(len(api_data)): + assert api_data[i]["expected_power_generation_megawatts"] == mock_data[i].quantity + assert pd.Timestamp(api_data[i]["timestamp"]) == pd.Timestamp(mock_data[i].start_time) + + +@pytest.mark.integration +def test_get_elexon_forecast(api_client): + response = api_client.get(API_URL) + + # Assertions + assert response.status_code == 200 + assert response.headers.get("Content-Type") == "application/json" + + solar_forecast = SolarForecastResponse(**response.json()) + + assert len(solar_forecast.data) > 0