Skip to content

Commit

Permalink
feat(ecmwf_realtime): Finish ecmwf realtime adaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Oct 28, 2024
1 parent f07b48c commit 17318e3
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 30 deletions.
6 changes: 4 additions & 2 deletions Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ COPY pyproject.toml /_lock/
# This layer is cached until uv.lock or pyproject.toml change.
# Delete any unwanted parts of the installed packages to reduce size
RUN --mount=type=cache,target=/root/.cache \
apt-get update && apt-get install build-essential -y && \
echo "Creating virtualenv at /venv" && \
conda create -qy -p /venv python=3.12 numcodecs && \
echo "Installing dependencies into /venv" && \
conda create -qy -p /venv python=3.12 numcodecs
RUN which gcc
RUN echo "Installing dependencies into /venv" && \
cd /_lock && \
mkdir src && \
uv sync --no-dev --no-install-project && \
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def parse_env() -> Adaptors:
case None:
log.error("MODEL_REPOSITORY is not set in environment.")
sys.exit(1)
case "ceda-mog":
case "ceda":
model_repository_adaptor = repositories.CedaMetOfficeGlobalModelRepository
case "ecmwf-hres-realtime":
case "ecmwf-realtime-s3":
model_repository_adaptor = repositories.ECMWFRealTimeS3ModelRepository
case _ as model:
log.error(f"Unknown model: {model}")
Expand Down
6 changes: 6 additions & 0 deletions src/nwp_consumer/internal/entities/coordinates.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ def from_pandas(
"as required keys 'init_time', 'step', and 'variable' are not all present. "
f"Got: '{list(pd_indexes.keys())}'",
))
if not all(len(pd_indexes[key].to_list()) > 0 for key in ["init_time", "step", "variable"]):
return Failure(ValueError(
f"Cannot create {cls.__class__.__name__} instance from pandas indexes "
"as the 'init_time', 'step', and 'variable' dimensions must have "
"at least one coordinate value.",
))
input_parameter_set: set[str] = set(pd_indexes["variable"].to_list())
known_parameter_set: set[str] = {str(p) for p in Parameter}
if not input_parameter_set.issubset(known_parameter_set):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,37 @@
"""Model repository implementation for ECMWF live data from S3.
Documented Structure
--------------------
When getting live or realtime data from ECMWF, grib files are sent by
a data provider to a location of choice, in this case an S3 bucket.
The `ECMWF Dissemination Schedule <https://confluence.ecmwf.int/display/DAC/Dissemination+schedule>`_
describes the naming convention and time ordering for these files:
- A 2-character prefix
- A 1-character dissemination stream indicator
- 8 digits representing the initialization time in the format mmddHHMM
- 8 digits representing the target time in the format mmddHHMM
- 1 digit representing the file number(?)
So a file named `A2D10250000D10260100` would be for an initialization
time of 2024-10-25 00:00 and a target time of 2024-10-26 01:00 (step of 25 hours).
The file contents is specific to the order agreed with the data provider.
For the order that OCF has created, there are four distinct datasets.
This is because OCF has ordered two separate regions and 17 variables,
which are split across two datasets.
Also, some of the data contains larger steps than we are interested in due
to necessities in the order creation process.
"""

import datetime as dt
import functools
import logging
import os
import pathlib
import re
from collections.abc import Callable, Iterator
from typing import override

Expand Down Expand Up @@ -101,9 +124,11 @@ def fetch_init_data(self, it: dt.datetime) \
urls: list[str] = [
f"s3://{f}"
for f in self._fs.ls(f"{self.bucket}/ecmwf")
if it.strftime(
os.getenv("ECMWF_DISSEMINATION_REALTIME_FILE_PREFIX", "A2") + "D%m%d%H%M",
) in f
if self._wanted_file(
filename=f.split("/")[-1],
it=it,
max_step=max(self.model().expected_coordinates.step),
)
]
except Exception as e:
yield delayed(Failure)(ValueError(
Expand All @@ -117,7 +142,7 @@ def fetch_init_data(self, it: dt.datetime) \
yield delayed(Failure)(ValueError(
f"No raw files found for init time '{it.strftime('%Y-%m-%d %H:%M')}' "
f"in bucket path '{self.bucket}/ecmwf'. Ensure files exist at the given path "
"named with the '...MMDDHHMM...' pattern.",
"named with the expected pattern, e.g. 'A2S10250000102603001.",
))

for url in urls:
Expand Down Expand Up @@ -151,9 +176,7 @@ def _download_and_convert(self, url: str) -> ResultE[list[xr.DataArray]]:
Args:
url: The URL of the file to download.
"""
return self._download(url=url).bind(
functools.partial(self._convert, model=self.model()),
)
return self._download(url=url).bind(self._convert)

def _download(self, url: str) -> ResultE[pathlib.Path]:
"""Download an ECMWF realtime file from S3.
Expand Down Expand Up @@ -198,7 +221,7 @@ def _download(self, url: str) -> ResultE[pathlib.Path]:
return Success(local_path)

@staticmethod
def _convert(path: pathlib.Path, model: entities.ModelMetadata) -> ResultE[list[xr.DataArray]]:
def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
"""Convert a grib file to an xarray DataArray.
Args:
Expand All @@ -220,12 +243,12 @@ def _convert(path: pathlib.Path, model: entities.ModelMetadata) -> ResultE[list[
da: xr.DataArray = (
ds.pipe(ECMWFRealTimeS3ModelRepository._rename_vars)
.rename(name_dict={"time": "init_time"})
.expand_dims(dim=["init_time", "step"])
.expand_dims(dim="init_time")
.expand_dims(dim="step")
.to_dataarray(name=ECMWFRealTimeS3ModelRepository.model().name)
)
da = (
da.where(cond=da["step"] <= max(model.expected_coordinates.step), drop=True)
.drop_vars(
da.drop_vars(
names=[
c for c in ds.coords
if c not in ["init_time", "step", "variable", "latitude", "longitude"]
Expand All @@ -240,10 +263,14 @@ def _convert(path: pathlib.Path, model: entities.ModelMetadata) -> ResultE[list[
return Failure(ValueError(
f"Error processing dataset {i} from '{path}' to DataArray: {e}",
))
# Put each variable into its own dataarray as each file does not contain a full set
# and so may not produce a contiguous subset of the expected coordinates
# Put each variable into its own DataArray:
# * Each raw file does not contain a full set of parameters
# * and so may not produce a contiguous subset of the expected coordinates.
processed_das.extend(
[da.where(cond=da["variable"] == v, drop=True) for v in da["variable"].values],
[
da.where(cond=da["variable"] == v, drop=True)
for v in da["variable"].values
],
)

return Success(processed_das)
Expand Down Expand Up @@ -277,3 +304,29 @@ def _rename_vars(ds: xr.Dataset) -> xr.Dataset:
if old in ds.data_vars:
ds = ds.rename({old: new})
return ds

@staticmethod
def _wanted_file(filename: str, it: dt.datetime, max_step: int) -> bool:
"""Determine if the file is wanted based on the init time.
See module docstring for the file naming convention.
Returns True if the filename describes data corresponding to the input
initialisation time and model metadata.
Args:
filename: The name of the file.
it: The init time of the model run.
max_step: The maximum step in hours to consider.
"""
prefix: str = os.getenv("ECMWF_DISSEMINATION_REALTIME_FILE_PREFIX", "A2")
pattern: str = r"^" + prefix + r"[DS](\d{8})(\d{8})\d$"
match: re.Match | None = re.search(pattern=pattern, string=filename)
if match is None:
return False
if it.strftime("%m%d%H%M") != match.group(1):
return False
tt: dt.datetime = dt.datetime.strptime(
str(it.year) + match.group(2) + "+0000",
"%Y%m%d%H%M%z",
)
return tt < it + dt.timedelta(hours=max_step)
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
import datetime as dt
import os
import unittest
from typing import TYPE_CHECKING

from returns.pipeline import flow, is_successful
from returns.pointfree import bind
import xarray as xr
from returns.pipeline import is_successful

from nwp_consumer.internal import entities

from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository
from ...entities import NWPDimensionCoordinateMap
from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository

if TYPE_CHECKING:
import xarray as xr

from nwp_consumer.internal import entities


class TestECMWFRealTimeS3ModelRepository(unittest.TestCase):
Expand All @@ -19,7 +21,7 @@ class TestECMWFRealTimeS3ModelRepository(unittest.TestCase):
@unittest.skipIf(
condition="CI" in os.environ,
reason="Skipping integration test that requires S3 access.",
)
) # TODO: Move into integration tests, or remove
def test__download_and_convert(self) -> None:
"""Test the _download_and_convert method."""

Expand All @@ -36,9 +38,7 @@ def test__download_and_convert(self) -> None:
urls: list[str] = [
f"s3://{f}"
for f in c._fs.ls(f"{c.bucket}/ecmwf")
if test_it.strftime(
os.getenv("ECMWF_DISSEMINATION_REALTIME_FILE_PREFIX", "A2") + "D%m%d%H%M",
) in f
if c._wanted_file(f.split("/")[-1], test_it, c.model())
]

for url in urls:
Expand All @@ -51,4 +51,51 @@ def test__download_and_convert(self) -> None:
determine_region_result = NWPDimensionCoordinateMap.from_xarray(da).bind(
test_coordinates.determine_region,
)
self.assertTrue(is_successful(determine_region_result), msg=f"Error: {determine_region_result}")
self.assertTrue(
is_successful(determine_region_result),
msg=f"Error: {determine_region_result}",
)

def test__wanted_file(self) -> None:
"""Test the _wanted_file method."""

c: ECMWFRealTimeS3ModelRepository = ECMWFRealTimeS3ModelRepository()

@dataclasses.dataclass
class TestCase:
name: str
filename: str
expected: bool

test_it: dt.datetime = dt.datetime(2024, 10, 25, 0, tzinfo=dt.UTC)

tests: list[TestCase] = [
TestCase(
name="valid_filename",
filename="A2D10251200102516001",
expected=True,
),
TestCase(
name="invalid_filename",
filename="GGC10251200102516002",
expected=False,
),
TestCase(
name="unexpected_extension",
filename="A2D10251200102516001.nc",
expected=False,
),
TestCase(
name="step_too_large",
filename="A2D10251200102916001",
expected=False,
),
]

for t in tests:
with self.subTest(name=t.name):
result = c._wanted_file(
filename=t.filename,
it=test_it,
max_step=max(c.model().expected_coordinates.step))
self.assertEqual(result, t.expected)

0 comments on commit 17318e3

Please sign in to comment.