Skip to content

Commit

Permalink
feat(main): Add MetOffice to cmd module
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 15, 2024
1 parent 4066c3b commit eb0e007
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 68 deletions.
20 changes: 15 additions & 5 deletions src/nwp_consumer/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,31 @@ def parse_env() -> Adaptors:
"""Parse from the environment."""
model_repository_adaptor: type[ports.ModelRepository]
match os.getenv("MODEL_REPOSITORY"):
# Default to NOAA S3 as it is freely accessible
case None | "gfs":
model_repository_adaptor = repositories.NOAAS3ModelRepository
model_repository_adaptor = \
repositories.model_repositories.NOAAS3ModelRepository
case "ceda":
model_repository_adaptor = repositories.CEDAFTPModelRepository
model_repository_adaptor = \
repositories.model_repositories.CEDAFTPModelRepository
case "ecmwf-realtime":
model_repository_adaptor = repositories.ECMWFRealTimeS3ModelRepository
model_repository_adaptor = \
repositories.model_repositories.ECMWFRealTimeS3ModelRepository
case "metoffice-datahub":
model_repository_adaptor = \
repositories.model_repositories.MetOfficeDatahubModelRepository
case _ as model:
log.error(f"Unknown model: {model}")
sys.exit(1)

notification_repository_adaptor: type[ports.NotificationRepository]
match os.getenv("NOTIFICATION_REPOSITORY", "stdout"):
case "stdout":
notification_repository_adaptor = repositories.StdoutNotificationRepository
notification_repository_adaptor = \
repositories.notification_repositories.StdoutNotificationRepository
case "dagster-pipes":
notification_repository_adaptor = repositories.DagsterPipesNotificationRepository
notification_repository_adaptor = \
repositories.notification_repositories.DagsterPipesNotificationRepository
case _ as notification:
log.error(f"Unknown notification repository: {notification}")
sys.exit(1)
Expand All @@ -46,6 +55,7 @@ def parse_env() -> Adaptors:

def run_cli() -> None:
"""Entrypoint for the CLI handler."""
# TODO: InfoUseCase
adaptors = parse_env()
c = handlers.CLIHandler(
consumer_usecase=services.ConsumerService(
Expand Down
29 changes: 29 additions & 0 deletions src/nwp_consumer/internal/entities/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
"""

import dataclasses
import logging
from enum import StrEnum, auto

import xarray as xr
from returns.result import Failure, ResultE, Success

log = logging.getLogger("nwp-consumer")


@dataclasses.dataclass(slots=True)
class ParameterLimits:
Expand Down Expand Up @@ -325,3 +329,28 @@ def try_from_alternate(name: str) -> ResultE["Parameter"]:
return Success(p)
return Failure(ValueError(f"Unknown shortname: {name}"))

@staticmethod
def rename_else_drop_ds_vars(
ds: xr.Dataset, allowed_parameters: list["Parameter"],
) -> xr.Dataset:
"""Rename variables to match expected names, dropping invalid ones.
Returns a dataset with all variables in it renamed to a known `entities.Parameter`
name, if a matching parameter exists, and it is an allowed parameter. Otherwise,
the variable is dropped from the dataset.
Args:
ds: The xarray dataset to rename.
allowed_parameters: The list of parameters allowed in the resultant dataset.
"""
for var in ds.data_vars:
param_result = Parameter.try_from_alternate(str(var))
match param_result:
case Success(p):
if p in allowed_parameters:
ds = ds.rename_vars({var: p.value})
continue
log.debug("Dropping invalid parameter '%s' from dataset", var)
ds = ds.drop_vars(str(var))
return ds

38 changes: 38 additions & 0 deletions src/nwp_consumer/internal/entities/test_parameters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import unittest

import numpy as np
import xarray as xr
from hypothesis import given
from hypothesis import strategies as st
from returns.pipeline import is_successful
Expand All @@ -25,6 +27,42 @@ def test_try_from_shortname(self, shortname: str) -> None:
p = Parameter.try_from_alternate("invalid")
self.assertFalse(is_successful(p))

@given(
st.sampled_from([s for p in Parameter for s in p.metadata().alternate_shortnames]),
st.sampled_from(Parameter),
)
def test_rename_else_drop_ds_vars(self, shortname: str, parameter: Parameter) -> None:
"""Test the rename_else_drop_ds_vars method."""
allowed_parameters: list[Parameter] = [parameter]

ds = xr.Dataset(
data_vars={
shortname: (
("init_time", "step", "latitude", "longitude"), np.random.rand(1, 12, 15, 15),
),
"unknown-parameter": (
("init_time", "step", "latitude", "longitude"), np.random.rand(1, 12, 15, 15),
),
},
coords={
"init_time": np.array([0]),
"step": np.array(range(12)),
"latitude": np.array(range(15)),
"longitude": np.array(range(15)),
},
)

ds = Parameter.rename_else_drop_ds_vars(
ds,
allowed_parameters=allowed_parameters,
)

if shortname in parameter.metadata().alternate_shortnames:
self.assertTrue(len(list(ds.data_vars)) == 1)
self.assertEqual(next(iter(ds.data_vars)), str(parameter))
else:
self.assertTrue(len(list(ds.data_vars)) == 0)


if __name__ == "__main__":
unittest.main()
28 changes: 2 additions & 26 deletions src/nwp_consumer/internal/ports/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from collections.abc import Callable, Iterator

import xarray as xr
from returns.result import ResultE, Success
from returns.result import ResultE

from nwp_consumer.internal import entities

Expand All @@ -30,7 +30,7 @@ class ModelRepository(abc.ABC):
Since different producers of NWP data have different data storage
implementations, a ModelRepository needs to define its own download
and processing methods.
and processing methods.
A source may provide one or more files for a given init time.
To keep memory usage at a minimum, when converting raw data to zarr,
Expand Down Expand Up @@ -124,30 +124,6 @@ def model() -> entities.ModelMetadata:
"""Metadata about the model."""
pass

@staticmethod
def _rename_or_drop_vars(ds: xr.Dataset, allowed_parameters: list[entities.Parameter]) \
-> xr.Dataset:
"""Rename variables to match expected names, dropping invalid ones.
Returns a dataset with all variables in it renamed to a known `entities.Parameter`
name, if a matching parameter exists, and it is an allowed parameter. Otherwise,
the variable is dropped from the dataset.
Args:
ds: The xarray dataset to rename.
allowed_parameters: The list of parameters allowed in the resultant dataset.
"""
for var in ds.data_vars:
param_result = entities.Parameter.try_from_alternate(str(var))
match param_result:
case Success(p):
if p in allowed_parameters:
ds = ds.rename_vars({var: p.value})
continue
log.debug("Dropping invalid parameter '%s' from dataset", var)
ds = ds.drop_vars(str(var))
return ds


class ZarrRepository(abc.ABC):
"""Interface for a repository that stores Zarr NWP data."""
Expand Down
26 changes: 18 additions & 8 deletions src/nwp_consumer/internal/ports/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
class ConsumeUseCase(abc.ABC):
"""Interface for the consumer use case.
Defines the business-critical methods for the following use case:
Defines the business-critical methods for the following use cases:
'A user should be able to consume NWP data for a given initialization time.'
- 'A user should be able to consume NWP data for a given initialization time.'
"""


Expand Down Expand Up @@ -55,9 +55,9 @@ def postprocess(self, options: entities.PostProcessOptions) -> ResultE[str]:
class ArchiveUseCase(abc.ABC):
"""Interface for the archive use case.
Defines the following business-critical methods:
Defines the business-critical methods for the following use cases:
'A user should be able to archive NWP data for a given time period.'
- 'A user should be able to archive NWP data for a given time period.'
"""

@abc.abstractmethod
Expand All @@ -73,15 +73,25 @@ def archive(self, year: int, month: int) -> ResultE[pathlib.Path]:
"""
pass

class InformUseCase(abc.ABC):
class InfoUseCase(abc.ABC):
"""Interface for the notification use case.
Defines the following business-critical methods:
Defines the business-critical methods for the following use cases:
'A user should be able to retrieve information about the service.'
- 'A user should be able to retrieve information about the service.'
"""

@abc.abstractmethod
def model_info(self) -> str:
def available_models(self) -> list[str]:
"""Get a list of available models."""
pass

@abc.abstractmethod
def model_repository_info(self) -> str:
"""Get information about the model repository."""
pass

@abc.abstractmethod
def model_info(self) -> str:
"""Get information about the model."""
pass
24 changes: 9 additions & 15 deletions src/nwp_consumer/internal/repositories/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,25 @@
- a message queue
- a filesystem
Since they are stores of data, they are referred to in this package
(and often in hexagonal architecture documentation) as *repositories*.
This module
-----------
This module contains implementations for the following driven actors:
- Notification Repository - Somewhere to send notifications to
- Model Repository - Source of NWP data
- Model Repository - A source of NWP data
Both inherit from the repository ports specified in the core via `nwp_consumer.internal.ports`.
"""

from .model_repositories import (
CEDAFTPModelRepository,
ECMWFRealTimeS3ModelRepository,
NOAAS3ModelRepository,
)
from .notification_repositories import (
StdoutNotificationRepository,
DagsterPipesNotificationRepository,
from . import (
model_repositories,
notification_repositories,
)

__all__ = [
"CEDAFTPModelRepository",
"ECMWFRealTimeS3ModelRepository",
"NOAAS3ModelRepository",
"StdoutNotificationRepository",
"DagsterPipesNotificationRepository",
"model_repositories",
"notification_repositories",
]
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
"""Model Repositories
TODO: Add description
"""

from .ceda_ftp import CEDAFTPModelRepository
from .ecmwf_realtime import ECMWFRealTimeS3ModelRepository
from .noaa_s3 import NOAAS3ModelRepository
from .mo_datahub import MetOfficeDatahubModelRepository

__all__ = [
"CEDAFTPModelRepository",
"ECMWFRealTimeS3ModelRepository",
"NOAAS3ModelRepository",
"MetOfficeDatahubModelRepository",
]

Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
)
try:
da: xr.DataArray = (
CEDAFTPModelRepository._rename_or_drop_vars(
entities.Parameter.rename_else_drop_ds_vars(
ds=ds,
allowed_parameters=CEDAFTPModelRepository.model().expected_coordinates.variable,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""Model repository implementation for ECMWF live data from S3.
Repository Information
======================
Documented Structure
--------------------
Expand Down Expand Up @@ -248,7 +251,7 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
for i, ds in enumerate(dss):
try:
da: xr.DataArray = (
ECMWFRealTimeS3ModelRepository._rename_or_drop_vars(
entities.Parameter.rename_else_drop_ds_vars(
ds=ds,
allowed_parameters=ECMWFRealTimeS3ModelRepository.model().expected_coordinates.variable,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
"""Repository implementation for data from MetOffice's DataHub service.
Repository Information
======================
The API documentation for the MetOffice Weather Datahub can be found at:
https://datahub.metoffice.gov.uk/docs/f/category/atmospheric/type/atmospheric/api-documentation
Documented Structure
--------------------
TODO: Document filestructure
"""

import datetime as dt
Expand Down Expand Up @@ -88,12 +96,7 @@ def model() -> entities.ModelMetadata:
float(f"{lat:.4f}") for lat in np.arange(89.856, -89.856 - 0.156, -0.156)
],
longitude=[
float(f"{lon:.4f}") for lon in np.concatenate([
np.arange(-45, 45, 0.234),
np.arange(45, 135, 0.234),
np.arange(135, 225, 0.234),
np.arange(225, 315, 0.234),
])
float(f"{lon:.4f}") for lon in np.concatenate([np.arange(-179.87, 180, 0.234)])
],
),
)
Expand Down Expand Up @@ -270,7 +273,10 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:

try:
da: xr.DataArray = (
ds.pipe(MetOfficeDatahubModelRepository._rename_or_drop_vars )
ds.pipe(
entities.Parameter.rename_else_drop_ds_vars,
allowed_parameters=MetOfficeDatahubModelRepository.model().expected_coordinates.variable,
)
.rename(name_dict={"time": "init_time"})
.expand_dims(dim="init_time")
.expand_dims(dim="step")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
This module contains the implementation of the model repository for the
NOAA GFS data stored in an S3 bucket.
Repository Information
======================
TODO: provide links etc
Documented Structure
--------------------
TODO: document filestructure
"""

import datetime as dt
Expand Down Expand Up @@ -227,7 +237,7 @@ def _convert(path: pathlib.Path) -> ResultE[list[xr.DataArray]]:
processed_das: list[xr.DataArray] = []
for i, ds in enumerate(dss):
try:
ds = NOAAS3ModelRepository._rename_or_drop_vars(
ds = entities.Parameter.rename_else_drop_ds_vars(
ds=ds,
allowed_parameters=NOAAS3ModelRepository.model().expected_coordinates.variable,
)
Expand Down
Loading

0 comments on commit eb0e007

Please sign in to comment.