From b71c97d6ef4dbf61943f997bf5d64ea49b73d8e6 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Thu, 17 Oct 2024 14:22:22 +0100 Subject: [PATCH] feat(ecmwf): Enable support for new realtime datasets (#188) --- src/nwp_consumer/internal/inputs/ecmwf/s3.py | 39 ++++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/src/nwp_consumer/internal/inputs/ecmwf/s3.py b/src/nwp_consumer/internal/inputs/ecmwf/s3.py index b81168f6..e3663dd9 100644 --- a/src/nwp_consumer/internal/inputs/ecmwf/s3.py +++ b/src/nwp_consumer/internal/inputs/ecmwf/s3.py @@ -78,10 +78,23 @@ def listRawFilesForInitTime(self, *, it: dt.datetime) -> list[internal.FileInfoM """Overrides the corresponding method in the parent class.""" allFiles: list[str] = self.__fs.ls((self.bucket / self.bucketPath).as_posix()) # List items are of the form "bucket/folder/filename, so extract just the filename + + fileprefix: str + match self.area: + case "uk": + fileprefix = "A1D" + case "nw-india": + fileprefix = "A1D" + case "india": + fileprefix = "T1D" + case _: + log.warn(event="Unknown area", area=self.area) + return [] + initTimeFiles: list[internal.FileInfoModel] = [ ECMWFLiveFileInfo(fname=pathlib.Path(file).name) for file in allFiles - if it.strftime("A1D%m%d%H") in file + if it.strftime(f"{fileprefix}%m%d%H") in file ] return initTimeFiles @@ -120,6 +133,13 @@ def mapCachedRaw(self, *, p: pathlib.Path) -> xr.Dataset: """Overrides the corresponding method in the parent class.""" all_dss: list[xr.Dataset] = cfgrib.open_datasets(p.as_posix()) area_dss: list[xr.Dataset] = _filterDatasetsByArea(all_dss, self.area) + if len(area_dss) == 0: + log.error( + event="No datasets found for area", + area=self.area, + file=p, + file_datasets=len(all_dss), + ) ds: xr.Dataset = xr.merge(area_dss, combine_attrs="drop_conflicts") del area_dss, all_dss @@ -180,10 +200,13 @@ def parameterConformMap(self) -> dict[str, OCFParameter]: def _filterDatasetsByArea(dss: list[xr.Dataset], area: str) -> list[xr.Dataset]: """Filters a list of datasets by area.""" - if area == "uk": - return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 60, dss)) - elif area == "nw-india": - return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 31, dss)) - else: - log.warn(event="Unknown area", area=area) - return [] + match area: + case "uk": + return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 60, dss)) + case "nw-india": + return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 31, dss)) + case "india": + return list(filter(lambda ds: ds.coords["latitude"].as_numpy().max() == 35, dss)) + case _: + log.warn(event="Unknown area", area=area) + return []