Skip to content

Commit

Permalink
feat(ecmwf): Enable support for new realtime datasets (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Oct 17, 2024
1 parent bd12126 commit b71c97d
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions src/nwp_consumer/internal/inputs/ecmwf/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,23 @@ def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoM
"""Overrides the corresponding method in the parent class."""
allFiles: list[str] = self.__fs.ls((self.bucket / self.bucketPath).as_posix())
# List items are of the form "bucket/folder/filename, so extract just the filename

fileprefix: str
match self.area:
case "uk":
fileprefix = "A1D"
case "nw-india":
fileprefix = "A1D"
case "india":
fileprefix = "T1D"
case _:
log.warn(event="Unknown area", area=self.area)
return []

initTimeFiles: list[internal.FileInfoModel] = [
ECMWFLiveFileInfo(fname=pathlib.Path(file).name)
for file in allFiles
if it.strftime("A1D%m%d%H") in file
if it.strftime(f"{fileprefix}%m%d%H") in file
]
return initTimeFiles

Expand Down Expand Up @@ -120,6 +133,13 @@ def mapCachedRaw(self, *, p: pathlib.Path) -> xr.Dataset:
"""Overrides the corresponding method in the parent class."""
all_dss: list[xr.Dataset] = cfgrib.open_datasets(p.as_posix())
area_dss: list[xr.Dataset] = _filterDatasetsByArea(all_dss, self.area)
if len(area_dss) == 0:
log.error(
event="No datasets found for area",
area=self.area,
file=p,
file_datasets=len(all_dss),
)
ds: xr.Dataset = xr.merge(area_dss, combine_attrs="drop_conflicts")
del area_dss, all_dss

Expand Down Expand Up @@ -180,10 +200,13 @@ def parameterConformMap(self) -> dict[str, OCFParameter]:

def _filterDatasetsByArea(dss: list[xr.Dataset], area: str) -> list[xr.Dataset]:
"""Filters a list of datasets by area."""
if area == "uk":
return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 60, dss))
elif area == "nw-india":
return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 31, dss))
else:
log.warn(event="Unknown area", area=area)
return []
match area:
case "uk":
return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 60, dss))
case "nw-india":
return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 31, dss))
case "india":
return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 35, dss))
case _:
log.warn(event="Unknown area", area=area)
return []

0 comments on commit b71c97d

Please sign in to comment.