Skip to content

Commit

Permalink
feat(gfs): Add grib convert tests
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 25, 2024
1 parent ec531e9 commit 382126a
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def repository() -> entities.ModelRepositoryMetadata:
is_order_based=False,
running_hours=[0, 6, 12, 18],
delay_minutes=(60 * 24 * 7), # 1 week
max_connections=100,
max_connections=20,
required_env=[],
optional_env={},
postprocess_options=entities.PostProcessOptions(),
Expand Down Expand Up @@ -173,28 +173,29 @@ def _download(self, url: str) -> ResultE[pathlib.Path]:

if local_path.stat().st_size != fs.info(url)["size"]:
return Failure(ValueError(
f"Failed to download file from S3 at '{url}'. "
"File size mismatch. File may be corrupted.",
f"File size mismatch from file at '{url}': "
f"{local_path.stat().st_size} != {fs.info(url)['size']} (remote). "
"File may be corrupted.",
))

# Also download the associated index file
# * This isn't critical, but speeds up reading the file in when converting
# TODO: Re-incorporate this when https://github.com/ecmwf/cfgrib/issues/350
# TODO: is resolved. Currently downloaded index files are ignored due to
# TODO: path differences once downloaded.
index_url: str = url + ".idx"
index_path: pathlib.Path = local_path.with_suffix(".grib.idx")
try:
with index_path.open("wb") as lf, fs.open(index_url, "rb") as rf:
for chunk in iter(lambda: rf.read(12 * 1024), b""):
lf.write(chunk)
lf.flush()
except Exception as e:
log.warning(
f"Failed to download index file from S3 at '{url}'. "
"This will require a manual indexing when converting the file. "
f"Encountered error: {e}",
)
# index_url: str = url + ".idx"
# index_path: pathlib.Path = local_path.with_suffix(".grib.idx")
# try:
# with index_path.open("wb") as lf, fs.open(index_url, "rb") as rf:
# for chunk in iter(lambda: rf.read(12 * 1024), b""):
# lf.write(chunk)
# lf.flush()
# except Exception as e:
# log.warning(
# f"Failed to download index file from S3 at '{url}'. "
# "This will require a manual indexing when converting the file. "
# f"Encountered error: {e}",
# )

return Success(local_path)

Expand Down Expand Up @@ -273,7 +274,15 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
return Failure(ValueError(
f"Error processing dataset {i} from '{path}' to DataArray: {e}",
))
processed_das.append(da)
# Put each variable into its own DataArray:
# * Each raw file does not contain a full set of parameters
# * and so may not produce a contiguous subset of the expected coordinates.
processed_das.extend(
[
da.where(cond=da["variable"] == v, drop=True)
for v in da["variable"].values
],
)

return Success(processed_das)

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import unittest
from typing import TYPE_CHECKING

from returns.result import Success, Failure, ResultE
from returns.result import Failure, ResultE, Success

from ...entities import NWPDimensionCoordinateMap
from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import dataclasses
import datetime as dt
import os
import pathlib
import unittest
from typing import TYPE_CHECKING

import s3fs
from returns.pipeline import is_successful
from returns.result import Failure, ResultE, Success

from ...entities import NWPDimensionCoordinateMap
from .noaa_s3 import NOAAS3ModelRepository
Expand Down Expand Up @@ -109,3 +110,51 @@ class TestCase:
)
self.assertEqual(result, t.expected)


def test__convert(self) -> None:
"""Test the _convert method."""

@dataclasses.dataclass
class TestCase:
filename: str
should_error: bool

tests: list[TestCase] = [
TestCase(
filename="test_HRES-GFS_10u.grib",
should_error=False,
),
TestCase(
filename="test_HRES-GFS_lcc.grib",
should_error=False,
),
TestCase(
filename="test_HRES_GFS_r.grib",
should_error=True,
),
]

expected_coords = dataclasses.replace(
NOAAS3ModelRepository.model().expected_coordinates,
init_time=[dt.datetime(2021, 5, 9, 6, tzinfo=dt.UTC)],
)

for t in tests:
with self.subTest(name=t.filename):
# Attempt to convert the file
result = NOAAS3ModelRepository._convert(
path=pathlib.Path(__file__).parent.absolute() / t.filename,
)
region_result: ResultE[dict[str, slice]] = result.do(
region
for das in result
for da in das
for region in NWPDimensionCoordinateMap.from_xarray(da).bind(
expected_coords.determine_region,
)
)
if t.should_error:
self.assertIsInstance(region_result, Failure, msg=f"{region_result}")
else:
self.assertIsInstance(region_result, Success, msg=f"{region_result}")

0 comments on commit 382126a

Please sign in to comment.