Skip to content

Commit

Permalink
feat(repositories): Add MetOffice DataHub repository
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 14, 2024
1 parent 2cc2866 commit b1bc468
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 88 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/branch_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ jobs:
uses: docker/setup-buildx-action@v3

- name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/main_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ concurrency:
jobs:

# Define an autotagger job that creates tags on changes to master
# Use #major #minor in merge commit messages to bump version beyond patch
# See https://github.com/RueLaLa/auto-tagger?tab=readme-ov-file#usage
tag:
runs-on: ubuntu-latest
if: |
Expand All @@ -34,3 +36,4 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_PR_NUMBER: ${{ github.event.number }}

4 changes: 1 addition & 3 deletions .github/workflows/tagged_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ env:

jobs:

# Job to create a container
# Job for building container image
# * Builds and pushes an OCI Container image to the registry defined in the environment variables
build-container:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
needs: ["lint-typecheck", "test-unit"]

steps:
# Do a non-shallow clone of the repo to ensure tags are present
Expand All @@ -44,7 +42,7 @@ jobs:
uses: docker/setup-buildx-action@v3

- name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/entities/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def metadata(self) -> ParameterData:
"incident on the surface expected over the next hour.",
units="W/m^2",
limits=ParameterLimits(upper=1500, lower=0),
alternate_shortnames=["swavr", "ssrd", "dswrf"],
alternate_shortnames=["swavr", "ssrd", "dswrf", "sdswrf"],
)
case self.DOWNWARD_LONGWAVE_RADIATION_FLUX_GL.name:
return ParameterData(
Expand All @@ -146,7 +146,7 @@ def metadata(self) -> ParameterData:
"incident on the surface expected over the next hour.",
units="W/m^2",
limits=ParameterLimits(upper=500, lower=0),
alternate_shortnames=["strd", "dlwrf"],
alternate_shortnames=["strd", "dlwrf", "sdlwrf"],
)
case self.RELATIVE_HUMIDITY_SL.name:
return ParameterData(
Expand Down
29 changes: 28 additions & 1 deletion src/nwp_consumer/internal/ports/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@

import abc
import datetime as dt
import logging
import pathlib
from collections.abc import Callable, Iterator

import xarray as xr
from returns.result import ResultE
from returns.result import ResultE, Success

from nwp_consumer.internal import entities

log = logging.getLogger("nwp-consumer")


class ModelRepository(abc.ABC):
"""Interface for a repository that produces raw NWP data.
Expand Down Expand Up @@ -121,6 +124,30 @@ def model() -> entities.ModelMetadata:
"""Metadata about the model."""
pass

@staticmethod
def _rename_or_drop_vars(ds: xr.Dataset, allowed_parameters: list[entities.Parameter]) \
-> xr.Dataset:
"""Rename variables to match expected names, dropping invalid ones.
Returns a dataset with all variables in it renamed to a known `entities.Parameter`
name, if a matching parameter exists, and it is an allowed parameter. Otherwise,
the variable is dropped from the dataset.
Args:
ds: The xarray dataset to rename.
allowed_parameters: The list of parameters allowed in the resultant dataset.
"""
for var in ds.data_vars:
param_result = entities.Parameter.try_from_alternate(str(var))
match param_result:
case Success(p):
if p in allowed_parameters:
ds = ds.rename_vars({var: p.value})
continue
log.debug("Dropping invalid parameter '%s' from dataset", var)
ds = ds.drop_vars(str(var))
return ds


class ZarrRepository(abc.ABC):
"""Interface for a repository that stores Zarr NWP data."""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from .metoffice_global import CEDAFTPModelRepository
from .ceda_ftp import CEDAFTPModelRepository
from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository
from .noaa_gfs import NOAAS3ModelRepository
from .noaa_s3 import NOAAS3ModelRepository

__all__ = [
"CEDAFTPModelRepository",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ def model() -> entities.ModelMetadata:
@override
def fetch_init_data(self, it: dt.datetime) \
-> Iterator[Callable[..., ResultE[list[xr.DataArray]]]]:
# Ensure class is authenticated
authenticate_result = self.authenticate()
if isinstance(authenticate_result, Failure):
yield delayed(Result.from_failure)(authenticate_result.failure())
return

parameter_stubs: list[str] = [
"Total_Downward_Surface_SW_Flux",
Expand Down Expand Up @@ -252,9 +247,7 @@ def _download(self, url: str) -> ResultE[pathlib.Path]:
f.write(chunk)
f.flush()
log.debug(
"Downloaded %s to %s (%s bytes)",
url,
local_path,
f"Downloaded '{url}' to '{local_path}' (%s bytes)",
local_path.stat().st_size,
)
except Exception as e:
Expand All @@ -268,7 +261,7 @@ def _download(self, url: str) -> ResultE[pathlib.Path]:

@staticmethod
def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
"""Convert a file to an xarray dataset.
"""Convert a local grib file to xarray DataArrays.
Args:
path: The path to the file to convert.
Expand Down Expand Up @@ -309,25 +302,3 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
),
)
return Success([da])


@staticmethod
def _rename_or_drop_vars(ds: xr.Dataset, allowed_parameters: list[entities.Parameter]) \
-> xr.Dataset:
"""Rename variables to match the expected names, dropping invalid ones.
Args:
ds: The xarray dataset to rename.
allowed_parameters: The list of parameters allowed in the resultant dataset.
"""
for var in ds.data_vars:
param_result = entities.Parameter.try_from_alternate(str(var))
match param_result:
case Success(p):
if p in allowed_parameters:
ds = ds.rename_vars({var: p.value})
continue
log.warning("Dropping invalid parameter '%s' from dataset", var)
ds = ds.drop_vars(str(var))
return ds

Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,10 @@ def fetch_init_data(self, it: dt.datetime) \
f"in bucket path '{self.bucket}/ecmwf'. Ensure files exist at the given path "
"named with the expected pattern, e.g. 'A2S10250000102603001.",
))
return

log.debug(
f"Found {len(urls)} files for init time '{it.strftime('%Y-%m-%d %H:%M')}' "
f"Found {len(urls)} file(s) for init time '{it.strftime('%Y-%m-%d %H:%M')}' "
f"in bucket path '{self.bucket}/ecmwf'.",
)
for url in urls:
Expand Down Expand Up @@ -309,25 +310,3 @@ def _wanted_file(filename: str, it: dt.datetime, max_step: int) -> bool:
"%Y%m%d%H%M%z",
)
return tt < it + dt.timedelta(hours=max_step)


@staticmethod
def _rename_or_drop_vars(ds: xr.Dataset, allowed_parameters: list[entities.Parameter]) \
-> xr.Dataset:
"""Rename variables to match the expected names, dropping invalid ones.
Args:
ds: The xarray dataset to rename.
allowed_parameters: The list of parameters allowed in the resultant dataset.
"""
for var in ds.data_vars:
param_result = entities.Parameter.try_from_alternate(str(var))
match param_result:
case Success(p):
if p in allowed_parameters:
ds = ds.rename_vars({var: p.value})
continue
log.warning("Dropping invalid parameter '%s' from dataset", var)
ds = ds.drop_vars(str(var))
return ds

Loading

0 comments on commit b1bc468

Please sign in to comment.