diff --git a/src/nwp_consumer/internal/repositories/model_repositories/noaa_s3.py b/src/nwp_consumer/internal/repositories/model_repositories/noaa_s3.py index 1766ad5..d95848d 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/noaa_s3.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/noaa_s3.py @@ -45,7 +45,7 @@ def repository() -> entities.ModelRepositoryMetadata: is_order_based=False, running_hours=[0, 6, 12, 18], delay_minutes=(60 * 24 * 7), # 1 week - max_connections=100, + max_connections=20, required_env=[], optional_env={}, postprocess_options=entities.PostProcessOptions(), @@ -173,8 +173,9 @@ def _download(self, url: str) -> ResultE[pathlib.Path]: if local_path.stat().st_size != fs.info(url)["size"]: return Failure(ValueError( - f"Failed to download file from S3 at '{url}'. " - "File size mismatch. File may be corrupted.", + f"File size mismatch from file at '{url}': " + f"{local_path.stat().st_size} != {fs.info(url)['size']} (remote). " + "File may be corrupted.", )) # Also download the associated index file @@ -182,19 +183,19 @@ def _download(self, url: str) -> ResultE[pathlib.Path]: # TODO: Re-incorporate this when https://github.com/ecmwf/cfgrib/issues/350 # TODO: is resolved. Currently downloaded index files are ignored due to # TODO: path differences once downloaded. - index_url: str = url + ".idx" - index_path: pathlib.Path = local_path.with_suffix(".grib.idx") - try: - with index_path.open("wb") as lf, fs.open(index_url, "rb") as rf: - for chunk in iter(lambda: rf.read(12 * 1024), b""): - lf.write(chunk) - lf.flush() - except Exception as e: - log.warning( - f"Failed to download index file from S3 at '{url}'. " - "This will require a manual indexing when converting the file. " - f"Encountered error: {e}", - ) + # index_url: str = url + ".idx" + # index_path: pathlib.Path = local_path.with_suffix(".grib.idx") + # try: + # with index_path.open("wb") as lf, fs.open(index_url, "rb") as rf: + # for chunk in iter(lambda: rf.read(12 * 1024), b""): + # lf.write(chunk) + # lf.flush() + # except Exception as e: + # log.warning( + # f"Failed to download index file from S3 at '{url}'. " + # "This will require a manual indexing when converting the file. " + # f"Encountered error: {e}", + # ) return Success(local_path) @@ -273,7 +274,15 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]: return Failure(ValueError( f"Error processing dataset {i} from '{path}' to DataArray: {e}", )) - processed_das.append(da) + # Put each variable into its own DataArray: + # * Each raw file does not contain a full set of parameters + # * and so may not produce a contiguous subset of the expected coordinates. + processed_das.extend( + [ + da.where(cond=da["variable"] == v, drop=True) + for v in da["variable"].values + ], + ) return Success(processed_das) diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_HRES-GFS_10u.grib b/src/nwp_consumer/internal/repositories/model_repositories/test_HRES-GFS_10u.grib new file mode 100644 index 0000000..59d9d96 Binary files /dev/null and b/src/nwp_consumer/internal/repositories/model_repositories/test_HRES-GFS_10u.grib differ diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_HRES-GFS_lcc.grib b/src/nwp_consumer/internal/repositories/model_repositories/test_HRES-GFS_lcc.grib new file mode 100644 index 0000000..0ad0545 Binary files /dev/null and b/src/nwp_consumer/internal/repositories/model_repositories/test_HRES-GFS_lcc.grib differ diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_HRES-GFS_r.grib b/src/nwp_consumer/internal/repositories/model_repositories/test_HRES-GFS_r.grib new file mode 100644 index 0000000..40d16d2 Binary files /dev/null and b/src/nwp_consumer/internal/repositories/model_repositories/test_HRES-GFS_r.grib differ diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py b/src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py index a8b2e49..9465cd8 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py @@ -5,7 +5,7 @@ import unittest from typing import TYPE_CHECKING -from returns.result import Success, Failure, ResultE +from returns.result import Failure, ResultE, Success from ...entities import NWPDimensionCoordinateMap from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_s3.py b/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_s3.py index 2e75452..844b0c2 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_s3.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/test_noaa_s3.py @@ -1,11 +1,12 @@ import dataclasses import datetime as dt import os +import pathlib import unittest from typing import TYPE_CHECKING import s3fs -from returns.pipeline import is_successful +from returns.result import Failure, ResultE, Success from ...entities import NWPDimensionCoordinateMap from .noaa_s3 import NOAAS3ModelRepository @@ -109,3 +110,51 @@ class TestCase: ) self.assertEqual(result, t.expected) + + def test__convert(self) -> None: + """Test the _convert method.""" + + @dataclasses.dataclass + class TestCase: + filename: str + should_error: bool + + tests: list[TestCase] = [ + TestCase( + filename="test_HRES-GFS_10u.grib", + should_error=False, + ), + TestCase( + filename="test_HRES-GFS_lcc.grib", + should_error=False, + ), + TestCase( + filename="test_HRES_GFS_r.grib", + should_error=True, + ), + ] + + expected_coords = dataclasses.replace( + NOAAS3ModelRepository.model().expected_coordinates, + init_time=[dt.datetime(2021, 5, 9, 6, tzinfo=dt.UTC)], + ) + + for t in tests: + with self.subTest(name=t.filename): + # Attempt to convert the file + result = NOAAS3ModelRepository._convert( + path=pathlib.Path(__file__).parent.absolute() / t.filename, + ) + region_result: ResultE[dict[str, slice]] = result.do( + region + for das in result + for da in das + for region in NWPDimensionCoordinateMap.from_xarray(da).bind( + expected_coords.determine_region, + ) + ) + if t.should_error: + self.assertIsInstance(region_result, Failure, msg=f"{region_result}") + else: + self.assertIsInstance(region_result, Success, msg=f"{region_result}") +