diff --git a/src/nwp_consumer/internal/entities/parameters.py b/src/nwp_consumer/internal/entities/parameters.py index 2c20ea68..c022daeb 100644 --- a/src/nwp_consumer/internal/entities/parameters.py +++ b/src/nwp_consumer/internal/entities/parameters.py @@ -181,7 +181,7 @@ def metadata(self) -> ParameterData: "the wind in the eastward direction.", units="m/s", limits=ParameterLimits(upper=100, lower=-100), - alternate_shortnames=["u10"], + alternate_shortnames=["u10", "u"], ) case self.WIND_V_COMPONENT_10m.name: return ParameterData( @@ -192,7 +192,7 @@ def metadata(self) -> ParameterData: units="m/s", # Non-tornadic winds are usually < 100m/s limits=ParameterLimits(upper=100, lower=-100), - alternate_shortnames=["v10"], + alternate_shortnames=["v10", "v"], ) case self.WIND_U_COMPONENT_100m.name: return ParameterData( diff --git a/src/nwp_consumer/internal/repositories/model_repositories/ceda_ftp.py b/src/nwp_consumer/internal/repositories/model_repositories/ceda_ftp.py index d03e6a4f..faed8af2 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/ceda_ftp.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/ceda_ftp.py @@ -277,21 +277,35 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]: ), ) try: + ds = entities.Parameter.rename_else_drop_ds_vars( + ds=ds, + allowed_parameters=CEDAFTPModelRepository.model().expected_coordinates.variable, + ) + # Ignore datasets with no variables of interest + if len(ds.data_vars) == 0: + return Failure(OSError( + f"No relevant variables found in '{path}'. " + "Ensure file contains the expected variables, " + "and that desired variables are not being dropped.", + )) da: xr.DataArray = ( - entities.Parameter.rename_else_drop_ds_vars( - ds=ds, - allowed_parameters=CEDAFTPModelRepository.model().expected_coordinates.variable, - ) - .sel(step=[np.timedelta64(i, "h") for i in range(0, 48, 1)]) - .expand_dims(dim={"init_time": [ds["time"].values]}) - .drop_vars( - names=[ - v - for v in ds.coords.variables - if v not in ["init_time", "step", "latitude", "longitude"] - ], - ) + ds.sel( + step=slice( + np.timedelta64(0, "h"), + np.timedelta64( + CEDAFTPModelRepository.model().expected_coordinates.step[-1], + "h", + ), + )) + .drop_vars(names=[ + c for c in ds.coords if c not in ["time", "step", "latitude", "longitude"] + ]) + .rename(name_dict={"time": "init_time"}) + .expand_dims(dim="init_time") .to_dataarray(name=CEDAFTPModelRepository.model().name) + ) + da = ( + da .transpose("init_time", "step", "variable", "latitude", "longitude") # Remove the last value of the longitude dimension as it overlaps with the next file # Reverse the latitude dimension to be in descending order diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_UM-Global_10u-AreaC.grib b/src/nwp_consumer/internal/repositories/model_repositories/test_UM-Global_10u-AreaC.grib new file mode 100644 index 00000000..5b71c7d2 Binary files /dev/null and b/src/nwp_consumer/internal/repositories/model_repositories/test_UM-Global_10u-AreaC.grib differ diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_UM-Global_ssrd_AreaE.grib b/src/nwp_consumer/internal/repositories/model_repositories/test_UM-Global_ssrd_AreaE.grib new file mode 100644 index 00000000..ba95ef2c Binary files /dev/null and b/src/nwp_consumer/internal/repositories/model_repositories/test_UM-Global_ssrd_AreaE.grib differ diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_ceda_ftp.py b/src/nwp_consumer/internal/repositories/model_repositories/test_ceda_ftp.py index 3d372c24..abfe81de 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/test_ceda_ftp.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/test_ceda_ftp.py @@ -1,13 +1,16 @@ import dataclasses import datetime as dt import os +import pathlib import unittest -from returns.pipeline import flow, is_successful +from returns.pipeline import flow from returns.pointfree import bind +from returns.result import Failure, ResultE, Success from nwp_consumer.internal import entities +from ...entities import NWPDimensionCoordinateMap from .ceda_ftp import CEDAFTPModelRepository @@ -22,7 +25,7 @@ def test__download_and_convert(self) -> None: """Test the _download_and_convert method.""" auth_result = CEDAFTPModelRepository.authenticate() - self.assertTrue(is_successful(auth_result), msg=f"Error: {auth_result}") + self.assertIsInstance(auth_result, Success, msg=f"{auth_result!s}") c = auth_result.unwrap() test_it: dt.datetime = dt.datetime(2021, 1, 1, 0, tzinfo=dt.UTC) @@ -56,7 +59,7 @@ def url(self) -> str: with (self.subTest(area=test.area)): result = c._download_and_convert(test.url) - self.assertTrue(is_successful(result), msg=f"Error: {result}") + self.assertIsInstance(result, Success, msg=f"{result!s}") for da in result.unwrap(): # Check resultant arrays are a subset of the expected coordinates @@ -66,11 +69,54 @@ def url(self) -> str: bind(test_coordinates.determine_region), ) - self.assertTrue( - is_successful(subset_result), - msg=f"Error: {subset_result}", - ) + self.assertIsInstance(subset_result, Success, msg=f"{subset_result!s}") + + def test__convert(self) -> None: + """Test the _convert method.""" + + @dataclasses.dataclass + class TestCase: + filename: str + should_error: bool + + tests: list[TestCase] = [ + TestCase( + filename="test_UM-Global_10u-AreaC.grib", + should_error=False, + ), + TestCase( + filename="test_UM-Global_ssrd_AreaE.grib", + should_error=False, + ), + TestCase( + filename="test_HRES-IFS_10u.grib", + should_error=True, + ), + ] + expected_coords = dataclasses.replace( + CEDAFTPModelRepository.model().expected_coordinates, + init_time=[dt.datetime(2024, 11, 5, 0, tzinfo=dt.UTC)], + ) + + for t in tests: + with self.subTest(name=t.filename): + # Attempt to convert the file + result = CEDAFTPModelRepository._convert( + path=pathlib.Path(__file__).parent.absolute() / t.filename, + ) + region_result: ResultE[dict[str, slice]] = result.do( + region + for das in result + for da in das + for region in NWPDimensionCoordinateMap.from_xarray(da).bind( + expected_coords.determine_region, + ) + ) + if t.should_error: + self.assertIsInstance(region_result, Failure, msg=f"{region_result}") + else: + self.assertIsInstance(region_result, Success, msg=f"{region_result}") if __name__ == "__main__": unittest.main() diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_mo_datahub.py b/src/nwp_consumer/internal/repositories/model_repositories/test_mo_datahub.py index 60f64cc6..90415486 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/test_mo_datahub.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/test_mo_datahub.py @@ -79,3 +79,4 @@ class TestCase: self.assertIsInstance(region_result, Failure, msg=f"{region_result}") else: self.assertIsInstance(region_result, Success, msg=f"{region_result}") + diff --git a/src/nwp_consumer/internal/services/consumer_service.py b/src/nwp_consumer/internal/services/consumer_service.py index 749c90b9..a309f490 100644 --- a/src/nwp_consumer/internal/services/consumer_service.py +++ b/src/nwp_consumer/internal/services/consumer_service.py @@ -95,7 +95,7 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[str]: if isinstance(fetch_result, Failure): log.error( f"Error fetching data for init time '{it:%Y-%m-%d %H:%M}' " - f"and model {self.mr.repository().name}: {fetch_result.failure()!s}", + f"and model {self.mr.model().name}: {fetch_result.failure()!s}", ) failed_etls += 1 continue @@ -104,7 +104,7 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[str]: if isinstance(write_result, Failure): log.error( f"Error writing data for init time '{it:%Y-%m-%d %H:%M}' " - f"and model {self.mr.repository().name}: " + f"and model {self.mr.model().name}: " f"{write_result.failure()!s}", ) failed_etls += 1