Skip to content

Commit

Permalink
feat(ecmwf_realtime): Bit more debugging to do
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Oct 25, 2024
1 parent 26af2f0 commit f07b48c
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 61 deletions.
3 changes: 1 addition & 2 deletions Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ ENV UV_LINK_MODE=copy \
UV_COMPILE_BYTECODE=1 \
UV_PYTHON_DOWNLOADS=never \
UV_PYTHON=python3.12 \
UV_PROJECT_ENVIRONMENT=/venv \
PATH=/venv/bin:$PATH
UV_PROJECT_ENVIRONMENT=/venv
COPY pyproject.toml /_lock/

# Synchronize DEPENDENCIES without the application itself.
Expand Down
13 changes: 11 additions & 2 deletions src/nwp_consumer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@

if sys.stdout.isatty():
# Simple logging for terminals
_formatstr="%(levelname)s | %(message)s"
_formatstr="%(levelname)s [%(name)s] | %(message)s"
else:
# JSON logging for containers
_formatstr="".join((
Expand All @@ -143,5 +143,14 @@
datefmt="%Y-%m-%dT%H:%M:%S",
)

for logger in ["numcodecs", "numexpr", "cfgrib"]:
for logger in [
"numcodecs",
"numexpr",
"gribapi",
"aiobotocore",
"s3fs",
"asyncio",
"botocore",
"cfgrib",
]:
logging.getLogger(logger).setLevel(logging.WARNING)
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ def parse_env() -> Adaptors:
case None:
log.error("MODEL_REPOSITORY is not set in environment.")
sys.exit(1)
case "ceda-metoffice-global":
case "ceda-mog":
model_repository_adaptor = repositories.CedaMetOfficeGlobalModelRepository
case "ecmwf-hres-realtime":
model_repository_adaptor = repositories.ECMWFRealTimeS3ModelRepository
case _ as model:
log.error(f"Unknown model: {model}")
sys.exit(1)
Expand Down
8 changes: 5 additions & 3 deletions src/nwp_consumer/internal/entities/coordinates.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def from_pandas(
f"Cannot create {cls.__class__.__name__} instance from pandas indexes "
f"as unknown index/dimension keys were encountered: {unknown_keys}.",
))
# TODO: Ensure correct ordering of lat/long?

# Convert the pandas Index objects to lists of the appropriate types
return Success(
Expand Down Expand Up @@ -307,8 +308,8 @@ def determine_region(
ValueError(
f"Coordinate values for dimension '{inner_dim_label}' in the inner map "
"exceed the number of coordinate values in the outer map. "
f"Got: {len(inner_dim_coords)}' (> {len(outer_dim_coords)}) "
"coordinate values.",
f"Got: {len(inner_dim_coords)} (> {len(outer_dim_coords)}) "
f"coordinate values.",
),
)
if not set(inner_dim_coords).issubset(set(outer_dim_coords)):
Expand Down Expand Up @@ -342,7 +343,8 @@ def determine_region(
ValueError(
f"Coordinate values for dimension '{inner_dim_label}' do not correspond "
f"with a contiguous index set in the outer dimension map. "
f"Non-contiguous values '{[outer_dim_indices[i] for i in idxs]}' "
f"Non-contiguous values '{[outer_dim_coords[i] for i in idxs]} "
f"(index {[outer_dim_indices[i] for i in idxs]})' "
f"adjacent in dimension coordinates.",
),
)
Expand Down
2 changes: 2 additions & 0 deletions src/nwp_consumer/internal/repositories/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from .model_repositories import (
CedaMetOfficeGlobalModelRepository,
ECMWFRealTimeS3ModelRepository,
)
from .notification_repositories import (
StdoutNotificationRepository,
Expand All @@ -33,6 +34,7 @@

__all__ = [
"CedaMetOfficeGlobalModelRepository",
"ECMWFRealTimeS3ModelRepository",
"StdoutNotificationRepository",
"DagsterPipesNotificationRepository",
]
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from .metoffice_global import CedaMetOfficeGlobalModelRepository
from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository

__all__ = ["CedaMetOfficeGlobalModelRepository"]
__all__ = [
"CedaMetOfficeGlobalModelRepository",
"ECMWFRealTimeS3ModelRepository",
]

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import datetime as dt
import functools
import logging
import os
import pathlib
Expand Down Expand Up @@ -65,15 +66,16 @@ def model() -> entities.ModelMetadata:
resolution="0.1 degrees",
expected_coordinates=entities.NWPDimensionCoordinateMap(
init_time=[],
step=list(range(0, 84, 1)),
variable=[
step=list(range(0, 85, 1)),
variable=sorted([
entities.Parameter.WIND_U_COMPONENT_10m,
entities.Parameter.WIND_V_COMPONENT_10m,
entities.Parameter.WIND_U_COMPONENT_100m,
entities.Parameter.WIND_V_COMPONENT_100m,
entities.Parameter.WIND_U_COMPONENT_200m,
entities.Parameter.WIND_V_COMPONENT_200m,
entities.Parameter.TEMPERATURE_SL,
entities.Parameter.TOTAL_PRECIPITATION_RATE_GL,
entities.Parameter.DOWNWARD_SHORTWAVE_RADIATION_FLUX_GL,
entities.Parameter.DOWNWARD_LONGWAVE_RADIATION_FLUX_GL,
entities.Parameter.CLOUD_COVER_HIGH,
Expand All @@ -82,7 +84,9 @@ def model() -> entities.ModelMetadata:
entities.Parameter.CLOUD_COVER_TOTAL,
entities.Parameter.SNOW_DEPTH_GL,
entities.Parameter.VISIBILITY_SL,
],
entities.Parameter.DIRECT_SHORTWAVE_RADIATION_FLUX_GL,
entities.Parameter.DOWNWARD_ULTRAVIOLET_RADIATION_FLUX_GL,
]),
latitude=[float(f"{lat / 10:.2f}") for lat in range(900, -900 - 1, -1)],
longitude=[float(f"{lon / 10:.2f}") for lon in range(-1800, 1800 + 1, 1)],
),
Expand All @@ -95,9 +99,11 @@ def fetch_init_data(self, it: dt.datetime) \
# List relevant files in the S3 bucket
try:
urls: list[str] = [
f"s3://{self.bucket}/ecmwf/{f}"
f"s3://{f}"
for f in self._fs.ls(f"{self.bucket}/ecmwf")
if it.strftime("%m%d%H%M") in f
if it.strftime(
os.getenv("ECMWF_DISSEMINATION_REALTIME_FILE_PREFIX", "A2") + "D%m%d%H%M",
) in f
]
except Exception as e:
yield delayed(Failure)(ValueError(
Expand Down Expand Up @@ -145,56 +151,58 @@ def _download_and_convert(self, url: str) -> ResultE[list[xr.DataArray]]:
Args:
url: The URL of the file to download.
"""
return self._download(url=url).bind(self._convert)
return self._download(url=url).bind(
functools.partial(self._convert, model=self.model()),
)

def _download(self, url: str) -> ResultE[pathlib.Path]:
"""Download an ECMWF realtime file from S3.
Args:
url: The URL to the S3 object.
"""
if self.bucket is None or self._fs is None:
return Failure(ConnectionError(
"Attempted to download file from S3 while not authenticated. "
"Ensure the 'authenticate' method has been called prior to download.",
))

local_path: pathlib.Path = (
pathlib.Path(
os.getenv(
"RAWDIR",
f"~/.local/cache/nwp/{self.repository().name}/{self.model().name}/raw",
),
) / url.split("/")[-1]
)
).with_suffix(".grib").expanduser()

# Only download the file if not already present
if not local_path.exists():
log.debug("Requesting file from S3 at: '%s'", url)
local_path.parent.mkdir(parents=True, exist_ok=True)
log.info("Requesting file from S3 at: '%s'", url)

try:
if not self._fs.exists(url):
raise FileNotFoundError(f"File not found at '{url}'")

with local_path.open("wb") as lf, self._fs.open(url, "rb") as rf:
for chunk in iter(lambda: rf.read(12 * 1024), b""):
lf.write(chunk)
lf.flush()

if local_path.stat().st_size != self._fs.info(url)["size"]:
raise ValueError(
f"Failed to download file from S3 at '{url}'. "
"File size mismatch. File may be corrupted.",
)

except Exception as e:
return Failure(OSError(
f"Failed to download file from S3 at '{url}'. Encountered error: {e}",
))

if local_path.stat().st_size != self._fs.info(url)["size"]:
return Failure(ValueError(
f"Failed to download file from S3 at '{url}'. "
"File size mismatch. File may be corrupted.",
))

return Success(local_path)

@staticmethod
def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
def _convert(path: pathlib.Path, model: entities.ModelMetadata) -> ResultE[list[xr.DataArray]]:
"""Convert a grib file to an xarray DataArray.
Args:
model: Metadata of the model producing the data
path: The path to the grib file.
"""
try:
Expand All @@ -209,39 +217,43 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
processed_das: list[xr.DataArray] = []
for i, ds in enumerate(dss):
try:
da = xr.DataArray(
ds.drop_vars(
da: xr.DataArray = (
ds.pipe(ECMWFRealTimeS3ModelRepository._rename_vars)
.rename(name_dict={"time": "init_time"})
.expand_dims(dim=["init_time", "step"])
.to_dataarray(name=ECMWFRealTimeS3ModelRepository.model().name)
)
da = (
da.where(cond=da["step"] <= max(model.expected_coordinates.step), drop=True)
.drop_vars(
names=[
v for v in ds.coords
if v not in ["time", "step", "latitude", "longitude"]
c for c in ds.coords
if c not in ["init_time", "step", "variable", "latitude", "longitude"]
],
errors="ignore",

)
.rename({"time": "init_time"})
.expand_dims("init_time")
.expand_dims("step")
.pipe(ECMWFRealTimeS3ModelRepository._rename_vars)
.to_dataarray(name=ECMWFRealTimeS3ModelRepository.repository().name)
.transpose("init_time", "step", "latitude", "longitude")
.sortby("step"),
.transpose("init_time", "step", "variable", "latitude", "longitude")
.sortby(variables=["step", "variable", "longitude"])
.sortby(variables="latitude", ascending=False)
)
except Exception as e:
return Failure(ValueError(
f"Error processing dataset {i} from '{path}' to DataArray: {e}",
))
processed_das.append(da)
del ds[i]
return Success(processed_das)
# Put each variable into its own dataarray as each file does not contain a full set
# and so may not produce a contiguous subset of the expected coordinates
processed_das.extend(
[da.where(cond=da["variable"] == v, drop=True) for v in da["variable"].values],
)

pass
return Success(processed_das)

@staticmethod
def _rename_vars(ds: xr.Dataset) -> xr.Dataset:
"""Rename variables to match the expected names."""
rename_map: dict[str, str] = {
"dsrp": entities.Parameter.DIRECT_SHORTWAVE_RADIATION_FLUX_GL.value,
"uvb": entities.Parameter.DOWNWARD_ULTRAVIOLET_RADIATION_FLUX_GL,
"uvb": entities.Parameter.DOWNWARD_ULTRAVIOLET_RADIATION_FLUX_GL.value,
"sd": entities.Parameter.SNOW_DEPTH_GL.value,
"tcc": entities.Parameter.CLOUD_COVER_TOTAL.value,
"clt": entities.Parameter.CLOUD_COVER_TOTAL.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,11 @@ def _download(self, url: str) -> ResultE[pathlib.Path]:
f"~/.local/cache/nwp/{self.repository().name}/{self.model().name}/raw",
),
) / url.split("/")[-1]
)
).expanduser()

# Don't download the file if it already exists
if not local_path.exists():
local_path.parent.mkdir(parents=True, exist_ok=True)
log.debug("Sending request to CEDA FTP server for: '%s'", url)
try:
response = urllib.request.urlopen( # noqa: S310
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import dataclasses
import datetime as dt
import os
import unittest

from returns.pipeline import flow, is_successful

Check failure on line 6 in src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py

View workflow job for this annotation

GitHub Actions / lint-typecheck

Ruff (F401)

src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py:6:30: F401 `returns.pipeline.flow` imported but unused
from returns.pointfree import bind

Check failure on line 7 in src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py

View workflow job for this annotation

GitHub Actions / lint-typecheck

Ruff (F401)

src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py:7:31: F401 `returns.pointfree.bind` imported but unused
import xarray as xr

Check failure on line 8 in src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py

View workflow job for this annotation

GitHub Actions / lint-typecheck

Ruff (TCH002)

src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py:8:18: TCH002 Move third-party import `xarray` into a type-checking block

from nwp_consumer.internal import entities

Check failure on line 10 in src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py

View workflow job for this annotation

GitHub Actions / lint-typecheck

Ruff (TCH001)

src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py:10:35: TCH001 Move application import `nwp_consumer.internal.entities` into a type-checking block

from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository
from ...entities import NWPDimensionCoordinateMap


class TestECMWFRealTimeS3ModelRepository(unittest.TestCase):

Check failure on line 16 in src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py

View workflow job for this annotation

GitHub Actions / lint-typecheck

Ruff (I001)

src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py:1:1: I001 Import block is un-sorted or un-formatted
"""Test the business methods of the ECMWFRealTimeS3ModelRepository class."""

@unittest.skipIf(
condition="CI" in os.environ,
reason="Skipping integration test that requires S3 access.",
)
def test__download_and_convert(self) -> None:
"""Test the _download_and_convert method."""

auth_result = ECMWFRealTimeS3ModelRepository.authenticate()
self.assertTrue(is_successful(auth_result), msg=f"Error: {auth_result.failure}")
c: ECMWFRealTimeS3ModelRepository = auth_result.unwrap()

test_it: dt.datetime = dt.datetime(2024, 10, 25, 0, tzinfo=dt.UTC)
test_coordinates: entities.NWPDimensionCoordinateMap = dataclasses.replace(
c.model().expected_coordinates,
init_time=[test_it],
)

urls: list[str] = [
f"s3://{f}"
for f in c._fs.ls(f"{c.bucket}/ecmwf")
if test_it.strftime(
os.getenv("ECMWF_DISSEMINATION_REALTIME_FILE_PREFIX", "A2") + "D%m%d%H%M",
) in f
]

for url in urls:
with (self.subTest(url=url)):
result = c._download_and_convert(url)

self.assertTrue(is_successful(result), msg=f"Error: {result}")

da: xr.DataArray = result.unwrap()[0]
determine_region_result = NWPDimensionCoordinateMap.from_xarray(da).bind(
test_coordinates.determine_region,
)
self.assertTrue(is_successful(determine_region_result), msg=f"Error: {determine_region_result}")

Check failure on line 54 in src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py

View workflow job for this annotation

GitHub Actions / lint-typecheck

Ruff (E501)

src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py:54:101: E501 Line too long (112 > 100)

Check failure on line 54 in src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py

View workflow job for this annotation

GitHub Actions / lint-typecheck

Ruff (W292)

src/nwp_consumer/internal/repositories/model_repositories/test_ecmwf_realtime.py:54:113: W292 No newline at end of file
Loading

0 comments on commit f07b48c

Please sign in to comment.