From 17318e398e54c7d79e08fd8f28c59a5f472aa0a8 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Mon, 28 Oct 2024 16:45:47 +0000 Subject: [PATCH] feat(ecmwf_realtime): Finish ecmwf realtime adaptor --- Containerfile | 6 +- src/nwp_consumer/{cmd.tmp => cmd}/__init__.py | 0 src/nwp_consumer/{cmd.tmp => cmd}/main.py | 4 +- .../internal/entities/coordinates.py | 6 ++ .../model_repositories/ecmwf_realtime.py | 83 +++++++++++++++---- .../model_repositories/test_ecmwf_realtime.py | 69 ++++++++++++--- 6 files changed, 138 insertions(+), 30 deletions(-) rename src/nwp_consumer/{cmd.tmp => cmd}/__init__.py (100%) rename src/nwp_consumer/{cmd.tmp => cmd}/main.py (97%) diff --git a/Containerfile b/Containerfile index 4124bb4d..864e8f49 100644 --- a/Containerfile +++ b/Containerfile @@ -31,9 +31,11 @@ COPY pyproject.toml /_lock/ # This layer is cached until uv.lock or pyproject.toml change. # Delete any unwanted parts of the installed packages to reduce size RUN --mount=type=cache,target=/root/.cache \ + apt-get update && apt-get install build-essential -y && \ echo "Creating virtualenv at /venv" && \ - conda create -qy -p /venv python=3.12 numcodecs && \ - echo "Installing dependencies into /venv" && \ + conda create -qy -p /venv python=3.12 numcodecs +RUN which gcc +RUN echo "Installing dependencies into /venv" && \ cd /_lock && \ mkdir src && \ uv sync --no-dev --no-install-project && \ diff --git a/src/nwp_consumer/cmd.tmp/__init__.py b/src/nwp_consumer/cmd/__init__.py similarity index 100% rename from src/nwp_consumer/cmd.tmp/__init__.py rename to src/nwp_consumer/cmd/__init__.py diff --git a/src/nwp_consumer/cmd.tmp/main.py b/src/nwp_consumer/cmd/main.py similarity index 97% rename from src/nwp_consumer/cmd.tmp/main.py rename to src/nwp_consumer/cmd/main.py index 0c3e0153..6d001863 100644 --- a/src/nwp_consumer/cmd.tmp/main.py +++ b/src/nwp_consumer/cmd/main.py @@ -21,9 +21,9 @@ def parse_env() -> Adaptors: case None: log.error("MODEL_REPOSITORY is not set in environment.") sys.exit(1) - case "ceda-mog": + case "ceda": model_repository_adaptor = repositories.CedaMetOfficeGlobalModelRepository - case "ecmwf-hres-realtime": + case "ecmwf-realtime-s3": model_repository_adaptor = repositories.ECMWFRealTimeS3ModelRepository case _ as model: log.error(f"Unknown model: {model}") diff --git a/src/nwp_consumer/internal/entities/coordinates.py b/src/nwp_consumer/internal/entities/coordinates.py index 54712969..565fe707 100644 --- a/src/nwp_consumer/internal/entities/coordinates.py +++ b/src/nwp_consumer/internal/entities/coordinates.py @@ -137,6 +137,12 @@ def from_pandas( "as required keys 'init_time', 'step', and 'variable' are not all present. " f"Got: '{list(pd_indexes.keys())}'", )) + if not all(len(pd_indexes[key].to_list()) > 0 for key in ["init_time", "step", "variable"]): + return Failure(ValueError( + f"Cannot create {cls.__class__.__name__} instance from pandas indexes " + "as the 'init_time', 'step', and 'variable' dimensions must have " + "at least one coordinate value.", + )) input_parameter_set: set[str] = set(pd_indexes["variable"].to_list()) known_parameter_set: set[str] = {str(p) for p in Parameter} if not input_parameter_set.issubset(known_parameter_set): diff --git a/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py b/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py index 599c3862..b64f234b 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/ecmwf_realtime.py @@ -1,14 +1,37 @@ """Model repository implementation for ECMWF live data from S3. +Documented Structure +-------------------- + When getting live or realtime data from ECMWF, grib files are sent by a data provider to a location of choice, in this case an S3 bucket. +The `ECMWF Dissemination Schedule `_ +describes the naming convention and time ordering for these files: + +- A 2-character prefix +- A 1-character dissemination stream indicator +- 8 digits representing the initialization time in the format mmddHHMM +- 8 digits representing the target time in the format mmddHHMM +- 1 digit representing the file number(?) + +So a file named `A2D10250000D10260100` would be for an initialization +time of 2024-10-25 00:00 and a target time of 2024-10-26 01:00 (step of 25 hours). + +The file contents is specific to the order agreed with the data provider. +For the order that OCF has created, there are four distinct datasets. +This is because OCF has ordered two separate regions and 17 variables, +which are split across two datasets. + +Also, some of the data contains larger steps than we are interested in due +to necessities in the order creation process. + """ import datetime as dt -import functools import logging import os import pathlib +import re from collections.abc import Callable, Iterator from typing import override @@ -101,9 +124,11 @@ def fetch_init_data(self, it: dt.datetime) \ urls: list[str] = [ f"s3://{f}" for f in self._fs.ls(f"{self.bucket}/ecmwf") - if it.strftime( - os.getenv("ECMWF_DISSEMINATION_REALTIME_FILE_PREFIX", "A2") + "D%m%d%H%M", - ) in f + if self._wanted_file( + filename=f.split("/")[-1], + it=it, + max_step=max(self.model().expected_coordinates.step), + ) ] except Exception as e: yield delayed(Failure)(ValueError( @@ -117,7 +142,7 @@ def fetch_init_data(self, it: dt.datetime) \ yield delayed(Failure)(ValueError( f"No raw files found for init time '{it.strftime('%Y-%m-%d %H:%M')}' " f"in bucket path '{self.bucket}/ecmwf'. Ensure files exist at the given path " - "named with the '...MMDDHHMM...' pattern.", + "named with the expected pattern, e.g. 'A2S10250000102603001.", )) for url in urls: @@ -151,9 +176,7 @@ def _download_and_convert(self, url: str) -> ResultE[list[xr.DataArray]]: Args: url: The URL of the file to download. """ - return self._download(url=url).bind( - functools.partial(self._convert, model=self.model()), - ) + return self._download(url=url).bind(self._convert) def _download(self, url: str) -> ResultE[pathlib.Path]: """Download an ECMWF realtime file from S3. @@ -198,7 +221,7 @@ def _download(self, url: str) -> ResultE[pathlib.Path]: return Success(local_path) @staticmethod - def _convert(path: pathlib.Path, model: entities.ModelMetadata) -> ResultE[list[xr.DataArray]]: + def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]: """Convert a grib file to an xarray DataArray. Args: @@ -220,12 +243,12 @@ def _convert(path: pathlib.Path, model: entities.ModelMetadata) -> ResultE[list[ da: xr.DataArray = ( ds.pipe(ECMWFRealTimeS3ModelRepository._rename_vars) .rename(name_dict={"time": "init_time"}) - .expand_dims(dim=["init_time", "step"]) + .expand_dims(dim="init_time") + .expand_dims(dim="step") .to_dataarray(name=ECMWFRealTimeS3ModelRepository.model().name) ) da = ( - da.where(cond=da["step"] <= max(model.expected_coordinates.step), drop=True) - .drop_vars( + da.drop_vars( names=[ c for c in ds.coords if c not in ["init_time", "step", "variable", "latitude", "longitude"] @@ -240,10 +263,14 @@ def _convert(path: pathlib.Path, model: entities.ModelMetadata) -> ResultE[list[ return Failure(ValueError( f"Error processing dataset {i} from '{path}' to DataArray: {e}", )) - # Put each variable into its own dataarray as each file does not contain a full set - # and so may not produce a contiguous subset of the expected coordinates + # Put each variable into its own DataArray: + # * Each raw file does not contain a full set of parameters + # * and so may not produce a contiguous subset of the expected coordinates. processed_das.extend( - [da.where(cond=da["variable"] == v, drop=True) for v in da["variable"].values], + [ + da.where(cond=da["variable"] == v, drop=True) + for v in da["variable"].values + ], ) return Success(processed_das) @@ -277,3 +304,29 @@ def _rename_vars(ds: xr.Dataset) -> xr.Dataset: if old in ds.data_vars: ds = ds.rename({old: new}) return ds + + @staticmethod + def _wanted_file(filename: str, it: dt.datetime, max_step: int) -> bool: + """Determine if the file is wanted based on the init time. + + See module docstring for the file naming convention. + Returns True if the filename describes data corresponding to the input + initialisation time and model metadata. + + Args: + filename: The name of the file. + it: The init time of the model run. + max_step: The maximum step in hours to consider. + """ + prefix: str = os.getenv("ECMWF_DISSEMINATION_REALTIME_FILE_PREFIX", "A2") + pattern: str = r"^" + prefix + r"[DS](\d{8})(\d{8})\d$" + match: re.Match | None = re.search(pattern=pattern, string=filename) + if match is None: + return False + if it.strftime("%m%d%H%M") != match.group(1): + return False + tt: dt.datetime = dt.datetime.strptime( + str(it.year) + match.group(2) + "+0000", + "%Y%m%d%H%M%z", + ) + return tt < it + dt.timedelta(hours=max_step) diff --git a/src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py b/src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py index b9825e70..b799a36d 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py @@ -2,15 +2,17 @@ import datetime as dt import os import unittest +from typing import TYPE_CHECKING -from returns.pipeline import flow, is_successful -from returns.pointfree import bind -import xarray as xr +from returns.pipeline import is_successful -from nwp_consumer.internal import entities - -from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository from ...entities import NWPDimensionCoordinateMap +from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository + +if TYPE_CHECKING: + import xarray as xr + + from nwp_consumer.internal import entities class TestECMWFRealTimeS3ModelRepository(unittest.TestCase): @@ -19,7 +21,7 @@ class TestECMWFRealTimeS3ModelRepository(unittest.TestCase): @unittest.skipIf( condition="CI" in os.environ, reason="Skipping integration test that requires S3 access.", - ) + ) # TODO: Move into integration tests, or remove def test__download_and_convert(self) -> None: """Test the _download_and_convert method.""" @@ -36,9 +38,7 @@ def test__download_and_convert(self) -> None: urls: list[str] = [ f"s3://{f}" for f in c._fs.ls(f"{c.bucket}/ecmwf") - if test_it.strftime( - os.getenv("ECMWF_DISSEMINATION_REALTIME_FILE_PREFIX", "A2") + "D%m%d%H%M", - ) in f + if c._wanted_file(f.split("/")[-1], test_it, c.model()) ] for url in urls: @@ -51,4 +51,51 @@ def test__download_and_convert(self) -> None: determine_region_result = NWPDimensionCoordinateMap.from_xarray(da).bind( test_coordinates.determine_region, ) - self.assertTrue(is_successful(determine_region_result), msg=f"Error: {determine_region_result}") \ No newline at end of file + self.assertTrue( + is_successful(determine_region_result), + msg=f"Error: {determine_region_result}", + ) + + def test__wanted_file(self) -> None: + """Test the _wanted_file method.""" + + c: ECMWFRealTimeS3ModelRepository = ECMWFRealTimeS3ModelRepository() + + @dataclasses.dataclass + class TestCase: + name: str + filename: str + expected: bool + + test_it: dt.datetime = dt.datetime(2024, 10, 25, 0, tzinfo=dt.UTC) + + tests: list[TestCase] = [ + TestCase( + name="valid_filename", + filename="A2D10251200102516001", + expected=True, + ), + TestCase( + name="invalid_filename", + filename="GGC10251200102516002", + expected=False, + ), + TestCase( + name="unexpected_extension", + filename="A2D10251200102516001.nc", + expected=False, + ), + TestCase( + name="step_too_large", + filename="A2D10251200102916001", + expected=False, + ), + ] + + for t in tests: + with self.subTest(name=t.name): + result = c._wanted_file( + filename=t.filename, + it=test_it, + max_step=max(c.model().expected_coordinates.step)) + self.assertEqual(result, t.expected)