Skip to content

Commit

Permalink
fix(tensorstore): Enable using existing stores
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 22, 2024
1 parent e3c016f commit b478038
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 240 deletions.
3 changes: 2 additions & 1 deletion src/nwp_consumer/internal/entities/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ def metadata(self) -> ParameterData:
name=str(self),
description="Downward shortwave radiation flux at ground level. "
"Defined as the mean amount of solar radiation "
"incident on the surface expected over the next hour.",
"incident on the surface expected over the next hour."
"This is made up of both direct and diffuse radiation.",
units="W/m^2",
limits=ParameterLimits(upper=1500, lower=0),
alternate_shortnames=["swavr", "ssrd", "dswrf", "sdswrf"],
Expand Down
135 changes: 105 additions & 30 deletions src/nwp_consumer/internal/entities/tensorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
chunked appropriately.
This module provides a class for storing metadata about a Zarr store.
TODO: 2024-11-20 This module wants refactoring into smaller testable components.
"""

import abc
Expand All @@ -13,6 +15,8 @@
import logging
import os
import pathlib
import shutil
from collections.abc import MutableMapping
from typing import Any

import pandas as pd
Expand Down Expand Up @@ -79,7 +83,8 @@ def initialize_empty_store(
This method writes a blank dataarray to disk based on the input coordinates,
which define the dimension labels and tick values of the output dataset object.
.. note: If a store already exists at the expected path, it will be overwritten!
.. note: If a store already exists at the expected path,
it is checked for consistency with the input coordinates and used if valid.
The dataarray is 'blank' because it is written via::
Expand Down Expand Up @@ -126,47 +131,27 @@ def initialize_empty_store(
),
)


zarrdir = os.getenv("ZARRDIR", f"~/.local/cache/nwp/{repository}/{model}/data")
store: zarr.storage.Store
path: str
filename: str = TensorStore.gen_store_filename(coords=coords)
try:
path = pathlib.Path(
"/".join((zarrdir, TensorStore.gen_store_filename(coords=coords))),
).expanduser().as_posix()
store = zarr.storage.DirectoryStore(path)
if zarrdir.startswith("s3"):
import s3fs
log.debug("Attempting AWS connection using credential discovery")
try:
fs = s3fs.S3FileSystem(
anon=False,
client_kwargs={
"region_name": os.getenv("AWS_REGION", "eu-west-1"),
"endpoint_url": os.getenv("AWS_ENDPOINT_URL", None),
},
)
path = zarrdir + "/" + TensorStore.gen_store_filename(coords=coords)
fs.mkdirs(path=path, exist_ok=True)
store = s3fs.mapping.S3Map(path, fs, check=False, create=True)
except Exception as e:
return Failure(OSError(
f"Unable to create file mapping for path '{path}'. "
"Ensure ZARRDIR environment variable is specified correctly, "
"and AWS credentials are discoverable by botocore. "
f"Error context: {e}",
))
store_result = cls._create_zarrstore_s3(zarrdir, filename)
store, path = store_result.unwrap() # Can do this as exceptions are caught
else:
path = pathlib.Path("/".join((zarrdir, filename))).expanduser().as_posix()
store = zarr.storage.DirectoryStore(path)
except Exception as e:
return Failure(OSError(
f"Unable to create Zarr Store at dir '{zarrdir}'. "
f"Unable to create Directory Store at dir '{zarrdir}'. "
"Ensure ZARRDIR environment variable is specified correctly. "
f"Error context: {e}",
))

# Write the coordinates to a skeleton Zarr store
# * 'compute=False' enables only saving metadata
# * 'mode="w"' overwrites any existing store
log.info("initializing zarr store at '%s'", path)
# * 'mode="w-"' fails if it finds an existing store
da: xr.DataArray = coords.as_zeroed_dataarray(name=model)
encoding = {
model: {"write_empty_chunks": False},
Expand All @@ -177,12 +162,31 @@ def initialize_empty_store(
_ = da.to_zarr(
store=store,
compute=False,
mode="w",
mode="w-",
consolidated=True,
encoding=encoding,
)
log.info("Created blank zarr store at '%s'", path)
# Ensure the store is readable
store_da = xr.open_dataarray(store, engine="zarr")
except zarr.errors.ContainsGroupError:
store_da = xr.open_dataarray(store, engine="zarr")
if store_da.name != da.name: # TODO: Also check for equality of coordinates
return Failure(OSError(
f"Existing store at '{path}' is for a different model. "
"Delete the existing store or move it to a new location, "
"or choose a new location for the new store via ZARRDIR.",
))
log.info(f"Using existing store at '{path}'")
return Success(
cls(
name=model,
path=path,
coordinate_map=coords,
size_kb=store_da.nbytes // 1024,
encoding=encoding,
),
)
except Exception as e:
return Failure(
OSError(
Expand Down Expand Up @@ -320,6 +324,36 @@ def validate_store(self) -> ResultE[bool]:

return Success(True)

def delete_store(self) -> ResultE[None]:
"""Delete the store."""
if self.path.startswith("s3://"):
import s3fs
try:
fs = s3fs.S3FileSystem(
anon=False,
client_kwargs={
"region_name": os.getenv("AWS_REGION", "eu-west-1"),
"endpoint_url": os.getenv("AWS_ENDPOINT_URL", None),
},
)
fs.rm(self.path, recursive=True)
except Exception as e:
return Failure(OSError(
f"Unable to delete S3 store at path '{self.path}'."
"Ensure AWS credentials are correct and discoverable by botocore. "
f"Error context: {e}",
))
else:
try:
shutil.rmtree(self.path)
except Exception as e:
return Failure(OSError(
f"Unable to delete store at path '{self.path}'. "
f"Error context: {e}",
))
log.info("Deleted zarr store at '%s'", self.path)
return Success(None)

def scan_parameter_values(self, p: Parameter) -> ResultE[ParameterScanResult]:
"""Scan the values of a parameter in the store.
Expand Down Expand Up @@ -410,6 +444,47 @@ def missing_times(self) -> ResultE[list[dt.datetime]]:
missing_times.append(pd.Timestamp(it).to_pydatetime().replace(tzinfo=dt.UTC))
return Success(missing_times)

@staticmethod
def _create_zarrstore_s3(s3_folder: str, filename: str) \
-> ResultE[tuple[MutableMapping, str]]: # type: ignore
"""Create a mutable mapping to an S3 store.
Authentication with S3 is done via botocore's credential discovery.
Returns:
A tuple containing the store mapping and the path to the store,
in a result object indicating success or failure.
See Also:
- https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials
"""
import s3fs
if not s3_folder.startswith("s3://"):
return Failure(ValueError(
"S3 folder path must start with 's3://'. "
f"Got: {s3_folder}",
))
log.debug("Attempting AWS connection using credential discovery")
try:
fs = s3fs.S3FileSystem(
anon=False,
client_kwargs={
"region_name": os.getenv("AWS_REGION", "eu-west-1"),
"endpoint_url": os.getenv("AWS_ENDPOINT_URL", None),
},
)
path = s3_folder + "/" + filename
fs.mkdirs(path=path, exist_ok=True)
store = s3fs.mapping.S3Map(path, fs, check=False, create=True)
except Exception as e:
return Failure(OSError(
f"Unable to create file mapping for path '{path}'. "
"Ensure ZARRDIR environment variable is specified correctly, "
"and AWS credentials are discoverable by botocore. "
f"Error context: {e}",
))
return Success((store, path))

@staticmethod
def gen_store_filename(coords: NWPDimensionCoordinateMap) -> str:
"""Create a filename for the store.
Expand Down
42 changes: 8 additions & 34 deletions src/nwp_consumer/internal/entities/test_tensorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import datetime as dt
import logging
import os
import shutil
import unittest
from collections.abc import Generator
from unittest.mock import patch
Expand All @@ -14,6 +13,7 @@
from botocore.session import Session
from moto.server import ThreadedMotoServer
from returns.pipeline import is_successful
from returns.result import Success

from .coordinates import NWPDimensionCoordinateMap
from .parameters import Parameter
Expand Down Expand Up @@ -83,49 +83,23 @@ def store(self, year: int) -> Generator[TensorStore, None, None]:
repository="dummy_repository",
coords=test_coords,
)
self.assertTrue(
is_successful(init_result),
msg=f"Unable to initialize store: {init_result}",
)
self.assertIsInstance(init_result, Success, msg=init_result)
store = init_result.unwrap()
yield store
shutil.rmtree(store.path)
store.delete_store()

@patch.dict(os.environ, {
"AWS_ENDPOINT_URL": "http://localhost:5000",
"AWS_ACCESS_KEY_ID": "test-key",
"AWS_SECRET_ACCESS_KEY": "test-secret",
"ZARRDIR": "s3://test-bucket/data",
}, clear=True)
def test_initialize_empty_store_s3(self) -> None:
def test_initialize_and_delete_s3(self) -> None:
"""Test the initialize_empty_store method."""

test_coords: NWPDimensionCoordinateMap = NWPDimensionCoordinateMap(
init_time=[
dt.datetime(2024, 1, 1, h, tzinfo=dt.UTC)
for h in [0, 6, 12, 18]
],
step=[1, 2, 3, 4],
variable=[Parameter.TEMPERATURE_SL],
latitude=np.linspace(90, -90, 12).tolist(),
longitude=np.linspace(0, 360, 18).tolist(),
)

with MockS3Bucket():
init_result = TensorStore.initialize_empty_store(
model="test_da",
repository="dummy_repository",
coords=test_coords,
)
self.assertTrue(is_successful(init_result))

# Assert it overwrites existing stores successfully
init_result = TensorStore.initialize_empty_store(
model="new_test_da",
repository="dummy_repository",
coords=test_coords,
)
self.assertTrue(is_successful(init_result))
with MockS3Bucket(), self.store(year=2022) as ts:
delete_result = ts.delete_store()
self.assertIsInstance(delete_result, Success, msg=delete_result)

def test_write_to_region(self) -> None:
"""Test the write_to_region method."""
Expand All @@ -146,7 +120,7 @@ def test_write_to_region(self) -> None:
test_da["init_time"] == it, drop=True,
).where(test_da["step"] == step, drop=True),
)
self.assertTrue(is_successful(write_result), msg=write_result)
self.assertIsInstance(write_result, Success, msg=write_result)

def test_postprocess(self) -> None:
"""Test the postprocess method."""
Expand Down
5 changes: 2 additions & 3 deletions src/nwp_consumer/internal/ports/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import abc
import datetime as dt
import pathlib

from returns.result import ResultE

Expand All @@ -25,7 +24,7 @@ class ConsumeUseCase(abc.ABC):


@abc.abstractmethod
def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]:
def consume(self, it: dt.datetime | None = None) -> ResultE[str]:
"""Consume NWP data to Zarr format for desired init time.
Where possible the implementation should be as memory-efficient as possible.
Expand Down Expand Up @@ -61,7 +60,7 @@ class ArchiveUseCase(abc.ABC):
"""

@abc.abstractmethod
def archive(self, year: int, month: int) -> ResultE[pathlib.Path]:
def archive(self, year: int, month: int) -> ResultE[str]:
"""Archive NWP data to Zarr format for the given month.
Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,15 @@ def fetch_init_data(
headers=self._headers,
method="GET",
)
log.debug(
f"Calling MetOffice Datahub at '{req.get_full_url()}'",
)

# Request the list of files
try:
response: http.client.HTTPResponse = urllib.request.urlopen(req, timeout=30) # noqa: S310
except Exception as e:
yield delayed(Failure(
yield delayed(Failure)(OSError(
"Unable to list files from MetOffice DataHub for order "
f"{self.order_id} at '{self.request_url}'. "
f"Ensure API key and Order ID are correct. Error context: {e}",
Expand All @@ -161,7 +164,7 @@ def fetch_init_data(
response.read().decode(response.info().get_param("charset") or "utf-8"), # type: ignore
)
except Exception as e:
yield delayed(Failure(
yield delayed(Failure)(ValueError(
"Unable to decode JSON response from MetOffice DataHub. "
"Check the response from the '/latest' endpoint looks as expected. "
f"Error context: {e}",
Expand Down
Loading

0 comments on commit b478038

Please sign in to comment.