Skip to content

Commit

Permalink
feat(ceda): Add grib convert tests
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 27, 2024
1 parent 84b72a0 commit 8c44c92
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 24 deletions.
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/entities/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def metadata(self) -> ParameterData:
"the wind in the eastward direction.",
units="m/s",
limits=ParameterLimits(upper=100, lower=-100),
alternate_shortnames=["u10"],
alternate_shortnames=["u10", "u"],
)
case self.WIND_V_COMPONENT_10m.name:
return ParameterData(
Expand All @@ -192,7 +192,7 @@ def metadata(self) -> ParameterData:
units="m/s",
# Non-tornadic winds are usually < 100m/s
limits=ParameterLimits(upper=100, lower=-100),
alternate_shortnames=["v10"],
alternate_shortnames=["v10", "v"],
)
case self.WIND_U_COMPONENT_100m.name:
return ParameterData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,21 +277,35 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
),
)
try:
ds = entities.Parameter.rename_else_drop_ds_vars(
ds=ds,
allowed_parameters=CEDAFTPModelRepository.model().expected_coordinates.variable,
)
# Ignore datasets with no variables of interest
if len(ds.data_vars) == 0:
return Failure(OSError(
f"No relevant variables found in '{path}'. "
"Ensure file contains the expected variables, "
"and that desired variables are not being dropped.",
))
da: xr.DataArray = (
entities.Parameter.rename_else_drop_ds_vars(
ds=ds,
allowed_parameters=CEDAFTPModelRepository.model().expected_coordinates.variable,
)
.sel(step=[np.timedelta64(i, "h") for i in range(0, 48, 1)])
.expand_dims(dim={"init_time": [ds["time"].values]})
.drop_vars(
names=[
v
for v in ds.coords.variables
if v not in ["init_time", "step", "latitude", "longitude"]
],
)
ds.sel(
step=slice(
np.timedelta64(0, "h"),
np.timedelta64(
CEDAFTPModelRepository.model().expected_coordinates.step[-1],
"h",
),
))
.drop_vars(names=[
c for c in ds.coords if c not in ["time", "step", "latitude", "longitude"]
])
.rename(name_dict={"time": "init_time"})
.expand_dims(dim="init_time")
.to_dataarray(name=CEDAFTPModelRepository.model().name)
)
da = (
da
.transpose("init_time", "step", "variable", "latitude", "longitude")
# Remove the last value of the longitude dimension as it overlaps with the next file
# Reverse the latitude dimension to be in descending order
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import dataclasses
import datetime as dt
import os
import pathlib
import unittest

from returns.pipeline import flow, is_successful
from returns.pipeline import flow
from returns.pointfree import bind
from returns.result import Failure, ResultE, Success

from nwp_consumer.internal import entities

from ...entities import NWPDimensionCoordinateMap
from .ceda_ftp import CEDAFTPModelRepository


Expand All @@ -22,7 +25,7 @@ def test__download_and_convert(self) -> None:
"""Test the _download_and_convert method."""

auth_result = CEDAFTPModelRepository.authenticate()
self.assertTrue(is_successful(auth_result), msg=f"Error: {auth_result}")
self.assertIsInstance(auth_result, Success, msg=f"{auth_result!s}")
c = auth_result.unwrap()

test_it: dt.datetime = dt.datetime(2021, 1, 1, 0, tzinfo=dt.UTC)
Expand Down Expand Up @@ -56,7 +59,7 @@ def url(self) -> str:
with (self.subTest(area=test.area)):
result = c._download_and_convert(test.url)

self.assertTrue(is_successful(result), msg=f"Error: {result}")
self.assertIsInstance(result, Success, msg=f"{result!s}")

for da in result.unwrap():
# Check resultant arrays are a subset of the expected coordinates
Expand All @@ -66,11 +69,54 @@ def url(self) -> str:
bind(test_coordinates.determine_region),
)

self.assertTrue(
is_successful(subset_result),
msg=f"Error: {subset_result}",
)
self.assertIsInstance(subset_result, Success, msg=f"{subset_result!s}")

def test__convert(self) -> None:
"""Test the _convert method."""

@dataclasses.dataclass
class TestCase:
filename: str
should_error: bool

tests: list[TestCase] = [
TestCase(
filename="test_UM-Global_10u-AreaC.grib",
should_error=False,
),
TestCase(
filename="test_UM-Global_ssrd_AreaE.grib",
should_error=False,
),
TestCase(
filename="test_HRES-IFS_10u.grib",
should_error=True,
),
]

expected_coords = dataclasses.replace(
CEDAFTPModelRepository.model().expected_coordinates,
init_time=[dt.datetime(2024, 11, 5, 0, tzinfo=dt.UTC)],
)

for t in tests:
with self.subTest(name=t.filename):
# Attempt to convert the file
result = CEDAFTPModelRepository._convert(
path=pathlib.Path(__file__).parent.absolute() / t.filename,
)
region_result: ResultE[dict[str, slice]] = result.do(
region
for das in result
for da in das
for region in NWPDimensionCoordinateMap.from_xarray(da).bind(
expected_coords.determine_region,
)
)
if t.should_error:
self.assertIsInstance(region_result, Failure, msg=f"{region_result}")
else:
self.assertIsInstance(region_result, Success, msg=f"{region_result}")

if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,4 @@ class TestCase:
self.assertIsInstance(region_result, Failure, msg=f"{region_result}")
else:
self.assertIsInstance(region_result, Success, msg=f"{region_result}")

4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/services/consumer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[str]:
if isinstance(fetch_result, Failure):
log.error(
f"Error fetching data for init time '{it:%Y-%m-%d %H:%M}' "
f"and model {self.mr.repository().name}: {fetch_result.failure()!s}",
f"and model {self.mr.model().name}: {fetch_result.failure()!s}",
)
failed_etls += 1
continue
Expand All @@ -104,7 +104,7 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[str]:
if isinstance(write_result, Failure):
log.error(
f"Error writing data for init time '{it:%Y-%m-%d %H:%M}' "
f"and model {self.mr.repository().name}: "
f"and model {self.mr.model().name}: "
f"{write_result.failure()!s}",
)
failed_etls += 1
Expand Down

0 comments on commit 8c44c92

Please sign in to comment.