From b4780387c6e9680146103265fb9127035cac95bd Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Tue, 19 Nov 2024 17:04:07 +0000 Subject: [PATCH] fix(tensorstore): Enable using existing stores --- .../internal/entities/parameters.py | 3 +- .../internal/entities/tensorstore.py | 135 +++++++++++--- .../internal/entities/test_tensorstore.py | 42 +---- src/nwp_consumer/internal/ports/services.py | 5 +- .../model_repositories/mo_datahub.py | 7 +- .../internal/services/archiver_service.py | 160 ++++++++-------- .../internal/services/consumer_service.py | 176 +++++++++--------- 7 files changed, 288 insertions(+), 240 deletions(-) diff --git a/src/nwp_consumer/internal/entities/parameters.py b/src/nwp_consumer/internal/entities/parameters.py index 67de5c8c..2c20ea68 100644 --- a/src/nwp_consumer/internal/entities/parameters.py +++ b/src/nwp_consumer/internal/entities/parameters.py @@ -137,7 +137,8 @@ def metadata(self) -> ParameterData: name=str(self), description="Downward shortwave radiation flux at ground level. " "Defined as the mean amount of solar radiation " - "incident on the surface expected over the next hour.", + "incident on the surface expected over the next hour." + "This is made up of both direct and diffuse radiation.", units="W/m^2", limits=ParameterLimits(upper=1500, lower=0), alternate_shortnames=["swavr", "ssrd", "dswrf", "sdswrf"], diff --git a/src/nwp_consumer/internal/entities/tensorstore.py b/src/nwp_consumer/internal/entities/tensorstore.py index 480973a7..b71e3baf 100644 --- a/src/nwp_consumer/internal/entities/tensorstore.py +++ b/src/nwp_consumer/internal/entities/tensorstore.py @@ -5,6 +5,8 @@ chunked appropriately. This module provides a class for storing metadata about a Zarr store. + +TODO: 2024-11-20 This module wants refactoring into smaller testable components. """ import abc @@ -13,6 +15,8 @@ import logging import os import pathlib +import shutil +from collections.abc import MutableMapping from typing import Any import pandas as pd @@ -79,7 +83,8 @@ def initialize_empty_store( This method writes a blank dataarray to disk based on the input coordinates, which define the dimension labels and tick values of the output dataset object. - .. note: If a store already exists at the expected path, it will be overwritten! + .. note: If a store already exists at the expected path, + it is checked for consistency with the input coordinates and used if valid. The dataarray is 'blank' because it is written via:: @@ -126,47 +131,27 @@ def initialize_empty_store( ), ) - zarrdir = os.getenv("ZARRDIR", f"~/.local/cache/nwp/{repository}/{model}/data") store: zarr.storage.Store path: str + filename: str = TensorStore.gen_store_filename(coords=coords) try: - path = pathlib.Path( - "/".join((zarrdir, TensorStore.gen_store_filename(coords=coords))), - ).expanduser().as_posix() - store = zarr.storage.DirectoryStore(path) if zarrdir.startswith("s3"): - import s3fs - log.debug("Attempting AWS connection using credential discovery") - try: - fs = s3fs.S3FileSystem( - anon=False, - client_kwargs={ - "region_name": os.getenv("AWS_REGION", "eu-west-1"), - "endpoint_url": os.getenv("AWS_ENDPOINT_URL", None), - }, - ) - path = zarrdir + "/" + TensorStore.gen_store_filename(coords=coords) - fs.mkdirs(path=path, exist_ok=True) - store = s3fs.mapping.S3Map(path, fs, check=False, create=True) - except Exception as e: - return Failure(OSError( - f"Unable to create file mapping for path '{path}'. " - "Ensure ZARRDIR environment variable is specified correctly, " - "and AWS credentials are discoverable by botocore. " - f"Error context: {e}", - )) + store_result = cls._create_zarrstore_s3(zarrdir, filename) + store, path = store_result.unwrap() # Can do this as exceptions are caught + else: + path = pathlib.Path("/".join((zarrdir, filename))).expanduser().as_posix() + store = zarr.storage.DirectoryStore(path) except Exception as e: return Failure(OSError( - f"Unable to create Zarr Store at dir '{zarrdir}'. " + f"Unable to create Directory Store at dir '{zarrdir}'. " "Ensure ZARRDIR environment variable is specified correctly. " f"Error context: {e}", )) # Write the coordinates to a skeleton Zarr store # * 'compute=False' enables only saving metadata - # * 'mode="w"' overwrites any existing store - log.info("initializing zarr store at '%s'", path) + # * 'mode="w-"' fails if it finds an existing store da: xr.DataArray = coords.as_zeroed_dataarray(name=model) encoding = { model: {"write_empty_chunks": False}, @@ -177,12 +162,31 @@ def initialize_empty_store( _ = da.to_zarr( store=store, compute=False, - mode="w", + mode="w-", consolidated=True, encoding=encoding, ) + log.info("Created blank zarr store at '%s'", path) # Ensure the store is readable store_da = xr.open_dataarray(store, engine="zarr") + except zarr.errors.ContainsGroupError: + store_da = xr.open_dataarray(store, engine="zarr") + if store_da.name != da.name: # TODO: Also check for equality of coordinates + return Failure(OSError( + f"Existing store at '{path}' is for a different model. " + "Delete the existing store or move it to a new location, " + "or choose a new location for the new store via ZARRDIR.", + )) + log.info(f"Using existing store at '{path}'") + return Success( + cls( + name=model, + path=path, + coordinate_map=coords, + size_kb=store_da.nbytes // 1024, + encoding=encoding, + ), + ) except Exception as e: return Failure( OSError( @@ -320,6 +324,36 @@ def validate_store(self) -> ResultE[bool]: return Success(True) + def delete_store(self) -> ResultE[None]: + """Delete the store.""" + if self.path.startswith("s3://"): + import s3fs + try: + fs = s3fs.S3FileSystem( + anon=False, + client_kwargs={ + "region_name": os.getenv("AWS_REGION", "eu-west-1"), + "endpoint_url": os.getenv("AWS_ENDPOINT_URL", None), + }, + ) + fs.rm(self.path, recursive=True) + except Exception as e: + return Failure(OSError( + f"Unable to delete S3 store at path '{self.path}'." + "Ensure AWS credentials are correct and discoverable by botocore. " + f"Error context: {e}", + )) + else: + try: + shutil.rmtree(self.path) + except Exception as e: + return Failure(OSError( + f"Unable to delete store at path '{self.path}'. " + f"Error context: {e}", + )) + log.info("Deleted zarr store at '%s'", self.path) + return Success(None) + def scan_parameter_values(self, p: Parameter) -> ResultE[ParameterScanResult]: """Scan the values of a parameter in the store. @@ -410,6 +444,47 @@ def missing_times(self) -> ResultE[list[dt.datetime]]: missing_times.append(pd.Timestamp(it).to_pydatetime().replace(tzinfo=dt.UTC)) return Success(missing_times) + @staticmethod + def _create_zarrstore_s3(s3_folder: str, filename: str) \ + -> ResultE[tuple[MutableMapping, str]]: # type: ignore + """Create a mutable mapping to an S3 store. + + Authentication with S3 is done via botocore's credential discovery. + + Returns: + A tuple containing the store mapping and the path to the store, + in a result object indicating success or failure. + + See Also: + - https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#configuring-credentials + """ + import s3fs + if not s3_folder.startswith("s3://"): + return Failure(ValueError( + "S3 folder path must start with 's3://'. " + f"Got: {s3_folder}", + )) + log.debug("Attempting AWS connection using credential discovery") + try: + fs = s3fs.S3FileSystem( + anon=False, + client_kwargs={ + "region_name": os.getenv("AWS_REGION", "eu-west-1"), + "endpoint_url": os.getenv("AWS_ENDPOINT_URL", None), + }, + ) + path = s3_folder + "/" + filename + fs.mkdirs(path=path, exist_ok=True) + store = s3fs.mapping.S3Map(path, fs, check=False, create=True) + except Exception as e: + return Failure(OSError( + f"Unable to create file mapping for path '{path}'. " + "Ensure ZARRDIR environment variable is specified correctly, " + "and AWS credentials are discoverable by botocore. " + f"Error context: {e}", + )) + return Success((store, path)) + @staticmethod def gen_store_filename(coords: NWPDimensionCoordinateMap) -> str: """Create a filename for the store. diff --git a/src/nwp_consumer/internal/entities/test_tensorstore.py b/src/nwp_consumer/internal/entities/test_tensorstore.py index 68844ebb..07f3590e 100644 --- a/src/nwp_consumer/internal/entities/test_tensorstore.py +++ b/src/nwp_consumer/internal/entities/test_tensorstore.py @@ -3,7 +3,6 @@ import datetime as dt import logging import os -import shutil import unittest from collections.abc import Generator from unittest.mock import patch @@ -14,6 +13,7 @@ from botocore.session import Session from moto.server import ThreadedMotoServer from returns.pipeline import is_successful +from returns.result import Success from .coordinates import NWPDimensionCoordinateMap from .parameters import Parameter @@ -83,13 +83,10 @@ def store(self, year: int) -> Generator[TensorStore, None, None]: repository="dummy_repository", coords=test_coords, ) - self.assertTrue( - is_successful(init_result), - msg=f"Unable to initialize store: {init_result}", - ) + self.assertIsInstance(init_result, Success, msg=init_result) store = init_result.unwrap() yield store - shutil.rmtree(store.path) + store.delete_store() @patch.dict(os.environ, { "AWS_ENDPOINT_URL": "http://localhost:5000", @@ -97,35 +94,12 @@ def store(self, year: int) -> Generator[TensorStore, None, None]: "AWS_SECRET_ACCESS_KEY": "test-secret", "ZARRDIR": "s3://test-bucket/data", }, clear=True) - def test_initialize_empty_store_s3(self) -> None: + def test_initialize_and_delete_s3(self) -> None: """Test the initialize_empty_store method.""" - test_coords: NWPDimensionCoordinateMap = NWPDimensionCoordinateMap( - init_time=[ - dt.datetime(2024, 1, 1, h, tzinfo=dt.UTC) - for h in [0, 6, 12, 18] - ], - step=[1, 2, 3, 4], - variable=[Parameter.TEMPERATURE_SL], - latitude=np.linspace(90, -90, 12).tolist(), - longitude=np.linspace(0, 360, 18).tolist(), - ) - - with MockS3Bucket(): - init_result = TensorStore.initialize_empty_store( - model="test_da", - repository="dummy_repository", - coords=test_coords, - ) - self.assertTrue(is_successful(init_result)) - - # Assert it overwrites existing stores successfully - init_result = TensorStore.initialize_empty_store( - model="new_test_da", - repository="dummy_repository", - coords=test_coords, - ) - self.assertTrue(is_successful(init_result)) + with MockS3Bucket(), self.store(year=2022) as ts: + delete_result = ts.delete_store() + self.assertIsInstance(delete_result, Success, msg=delete_result) def test_write_to_region(self) -> None: """Test the write_to_region method.""" @@ -146,7 +120,7 @@ def test_write_to_region(self) -> None: test_da["init_time"] == it, drop=True, ).where(test_da["step"] == step, drop=True), ) - self.assertTrue(is_successful(write_result), msg=write_result) + self.assertIsInstance(write_result, Success, msg=write_result) def test_postprocess(self) -> None: """Test the postprocess method.""" diff --git a/src/nwp_consumer/internal/ports/services.py b/src/nwp_consumer/internal/ports/services.py index 7c79fa27..f19a1888 100644 --- a/src/nwp_consumer/internal/ports/services.py +++ b/src/nwp_consumer/internal/ports/services.py @@ -8,7 +8,6 @@ import abc import datetime as dt -import pathlib from returns.result import ResultE @@ -25,7 +24,7 @@ class ConsumeUseCase(abc.ABC): @abc.abstractmethod - def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]: + def consume(self, it: dt.datetime | None = None) -> ResultE[str]: """Consume NWP data to Zarr format for desired init time. Where possible the implementation should be as memory-efficient as possible. @@ -61,7 +60,7 @@ class ArchiveUseCase(abc.ABC): """ @abc.abstractmethod - def archive(self, year: int, month: int) -> ResultE[pathlib.Path]: + def archive(self, year: int, month: int) -> ResultE[str]: """Archive NWP data to Zarr format for the given month. Args: diff --git a/src/nwp_consumer/internal/repositories/model_repositories/mo_datahub.py b/src/nwp_consumer/internal/repositories/model_repositories/mo_datahub.py index 9a3b7313..d2ae3213 100644 --- a/src/nwp_consumer/internal/repositories/model_repositories/mo_datahub.py +++ b/src/nwp_consumer/internal/repositories/model_repositories/mo_datahub.py @@ -145,12 +145,15 @@ def fetch_init_data( headers=self._headers, method="GET", ) + log.debug( + f"Calling MetOffice Datahub at '{req.get_full_url()}'", + ) # Request the list of files try: response: http.client.HTTPResponse = urllib.request.urlopen(req, timeout=30) # noqa: S310 except Exception as e: - yield delayed(Failure( + yield delayed(Failure)(OSError( "Unable to list files from MetOffice DataHub for order " f"{self.order_id} at '{self.request_url}'. " f"Ensure API key and Order ID are correct. Error context: {e}", @@ -161,7 +164,7 @@ def fetch_init_data( response.read().decode(response.info().get_param("charset") or "utf-8"), # type: ignore ) except Exception as e: - yield delayed(Failure( + yield delayed(Failure)(ValueError( "Unable to decode JSON response from MetOffice DataHub. " "Check the response from the '/latest' endpoint looks as expected. " f"Error context: {e}", diff --git a/src/nwp_consumer/internal/services/archiver_service.py b/src/nwp_consumer/internal/services/archiver_service.py index 7b8be59d..eea68b14 100644 --- a/src/nwp_consumer/internal/services/archiver_service.py +++ b/src/nwp_consumer/internal/services/archiver_service.py @@ -38,7 +38,7 @@ def __init__( self.nr = notification_repository @override - def archive(self, year: int, month: int) -> ResultE[pathlib.Path]: + def archive(self, year: int, month: int) -> ResultE[str]: monitor = entities.PerformanceMonitor() init_times = self.mr.repository().month_its(year=year, month=month) @@ -54,90 +54,86 @@ def archive(self, year: int, month: int) -> ResultE[pathlib.Path]: ), ) - match init_store_result: - case Failure(e): + if isinstance(init_store_result, Failure): monitor.join() # TODO: Make this a context manager instead return Failure(OSError( - f"Failed to initialize store for {year}-{month}: {e}"), + f"Failed to initialize store for {year}-{month}: {init_store_result!s}"), ) - case Success(store): - missing_times_result = store.missing_times() - if isinstance(missing_times_result, Failure): - monitor.join() - return Failure(missing_times_result.failure()) - log.info(f"{len(missing_times_result.unwrap())} missing init_times in store.") - - failed_times: list[dt.datetime] = [] - for n, it in enumerate(missing_times_result.unwrap()): - log.info( - f"Consuming data from {self.mr.repository().name} for {it:%Y-%m-%d %H:%M} " - f"(time {n + 1}/{len(missing_times_result.unwrap())})", - ) - - # Authenticate with the model repository - amr_result = self.mr.authenticate() - if isinstance(amr_result, Failure): - monitor.join() - return Failure(OSError( - "Unable to authenticate with model repository " - f"'{self.mr.repository().name}': " - f"{amr_result.failure()}", - )) - amr = amr_result.unwrap() - - # Create a generator to fetch and process raw data - n_jobs: int = max(cpu_count() - 1, self.mr.repository().max_connections) - if os.getenv("CONCURRENCY", "True").capitalize() == "False": - n_jobs = 1 - log.debug(f"Downloading using {n_jobs} concurrent thread(s)") - da_result_generator = Parallel( - n_jobs=n_jobs, - prefer="threads", - return_as="generator_unordered", - )(amr.fetch_init_data(it=it)) - - # Regionally write the results of the generator as they are ready - for da_result in da_result_generator: - write_result = da_result.bind(store.write_to_region) - # Fail soft if a region fails to write - if isinstance(write_result, Failure): - log.error(f"Failed to write time {it:%Y-%m-%d %H:%M}: {write_result}") - failed_times.append(it) - - del da_result_generator - - # Add the failed times to the store's metadata - store.update_attrs({ - "failed_times": [t.strftime("%d %H:%M") for t in failed_times], - }) - - # Postprocess the dataset as required - # postprocess_result = store.postprocess(self._mr.metadata().postprocess_options) - # if isinstance(postprocess_result, Failure): - # monitor.join() # TODO: Make this a context manager instead - # return Failure(postprocess_result.failure()) + store = init_store_result.unwrap() + + missing_times_result = store.missing_times() + if isinstance(missing_times_result, Failure): + monitor.join() + return Failure(missing_times_result.failure()) + log.info(f"{len(missing_times_result.unwrap())} missing init_times in store.") + + failed_times: list[dt.datetime] = [] + for n, it in enumerate(missing_times_result.unwrap()): + log.info( + f"Consuming data from {self.mr.repository().name} for {it:%Y-%m-%d %H:%M} " + f"(time {n + 1}/{len(missing_times_result.unwrap())})", + ) + # Authenticate with the model repository + amr_result = self.mr.authenticate() + if isinstance(amr_result, Failure): + store.delete_stoe() monitor.join() - notify_result = self.nr().notify( - message=entities.StoreCreatedNotification( - filename=pathlib.Path(store.path).name, - size_mb=store.size_kb // 1024, - performance=entities.PerformanceMetadata( - duration_seconds=monitor.get_runtime(), - memory_mb=max(monitor.memory_buffer) / 1e6, - ), - ), - ) - if isinstance(notify_result, Failure): - return Failure(OSError( - "Failed to notify of store creation: " - f"{notify_result.failure()}", - )) - - return Success(store.path) - - case _: - return Failure( - TypeError(f"Unexpected result type: {type(init_store_result)}"), - ) + return Failure(OSError( + "Unable to authenticate with model repository " + f"'{self.mr.repository().name}': " + f"{amr_result.failure()}", + )) + amr = amr_result.unwrap() + + # Create a generator to fetch and process raw data + n_jobs: int = max(cpu_count() - 1, self.mr.repository().max_connections) + if os.getenv("CONCURRENCY", "True").capitalize() == "False": + n_jobs = 1 + log.debug(f"Downloading using {n_jobs} concurrent thread(s)") + da_result_generator = Parallel( + n_jobs=n_jobs, + prefer="threads", + return_as="generator_unordered", + )(amr.fetch_init_data(it=it)) + + # Regionally write the results of the generator as they are ready + for da_result in da_result_generator: + write_result = da_result.bind(store.write_to_region) + # Fail soft if a region fails to write + if isinstance(write_result, Failure): + log.error(f"Failed to write time {it:%Y-%m-%d %H:%M}: {write_result}") + failed_times.append(it) + + del da_result_generator + + # Add the failed times to the store's metadata + store.update_attrs({ + "failed_times": ", ".join([t.strftime("Day %d %H:%M") for t in failed_times]), + }) + + # Postprocess the dataset as required + # postprocess_result = store.postprocess(self._mr.metadata().postprocess_options) + # if isinstance(postprocess_result, Failure): + # monitor.join() # TODO: Make this a context manager instead + # return Failure(postprocess_result.failure()) + + monitor.join() + notify_result = self.nr().notify( + message=entities.StoreCreatedNotification( + filename=pathlib.Path(store.path).name, + size_mb=store.size_kb // 1024, + performance=entities.PerformanceMetadata( + duration_seconds=monitor.get_runtime(), + memory_mb=max(monitor.memory_buffer) / 1e6, + ), + ), + ) + if isinstance(notify_result, Failure): + return Failure(OSError( + "Failed to notify of store creation: " + f"{notify_result.failure()}", + )) + + return Success(store.path) diff --git a/src/nwp_consumer/internal/services/consumer_service.py b/src/nwp_consumer/internal/services/consumer_service.py index 29aaca4f..46d603af 100644 --- a/src/nwp_consumer/internal/services/consumer_service.py +++ b/src/nwp_consumer/internal/services/consumer_service.py @@ -28,7 +28,6 @@ class ConsumerService(ports.ConsumeUseCase): def __init__( self, - # TODO: 2024-10-21 - Work out how to pass none instantiated class values through DI model_repository: type[ports.ModelRepository], notification_repository: type[ports.NotificationRepository], ) -> None: @@ -37,7 +36,11 @@ def __init__( self.nr = notification_repository @override - def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]: + def consume(self, it: dt.datetime | None = None) -> ResultE[str]: + # Note that the usage of the returns here is not in the spirit of + # 'railway orientated programming', mostly due to to the number of + # generators involved - it seemed clearer to be explicit. However, + # it would be much neater to refactor this to be more functional. monitor = entities.PerformanceMonitor() if it is None: @@ -56,94 +59,91 @@ def consume(self, it: dt.datetime | None = None) -> ResultE[pathlib.Path]: coords=dataclasses.replace(self.mr.model().expected_coordinates, init_time=[it]), ) - match init_store_result: - case Failure(e): - monitor.join() # TODO: Make this a context manager instead - return Failure(OSError(f"Failed to initialize store for init time: {e}")) - case Success(store): - - # Create a generator to fetch and process raw data - amr_result = self.mr.authenticate() - if isinstance(amr_result, Failure): - monitor.join() - return Failure(OSError( - "Unable to authenticate with model repository " - f"'{self.mr.repository().name}': " - f"{amr_result.failure()}", - )) - amr = amr_result.unwrap() - - n_jobs: int = max(cpu_count() - 1, self.mr.repository().max_connections) - if os.getenv("CONCURRENCY", "True").capitalize() == "False": - n_jobs = 1 - log.debug(f"Downloading using {n_jobs} concurrent thread(s)") - fetch_result_generator = Parallel( - # TODO - fix segfault when using multiple threads - n_jobs=n_jobs, - prefer="threads", - return_as="generator_unordered", - )(amr.fetch_init_data(it=it)) - - # Regionally write the results of the generator as they are ready - failed_etls: int = 0 - for fetch_result in fetch_result_generator: - if isinstance(fetch_result, Failure): - log.error( - f"Error fetching data for init time '{it:%Y-%m-%d %H:%M}' " - f"and model {self.mr.repository().name}: {fetch_result.failure()!s}", - ) - failed_etls += 1 - continue - for da in fetch_result.unwrap(): - write_result = store.write_to_region(da) - if isinstance(write_result, Failure): - log.error( - f"Error writing data for init time '{it:%Y-%m-%d %H:%M}' " - f"and model {self.mr.repository().name}: " - f"{write_result.failure()!s}", - ) - failed_etls += 1 - - del fetch_result_generator - # Fail hard if any of the writes failed - # * TODO: Consider just how hard we want to fail in this instance - if failed_etls > 0: - monitor.join() - return Failure(OSError( - f"Failed to write {failed_etls} regions " - f"for init time '{it:%Y-%m-%d %H:%M}'. " - "See error logs for details.", - )) - - # Postprocess the dataset as required - postprocess_result = store.postprocess(self.mr.repository().postprocess_options) - if isinstance(postprocess_result, Failure): - monitor.join() # TODO: Make this a context manager instead - return Failure(postprocess_result.failure()) - - monitor.join() - notify_result = self.nr().notify( - message=entities.StoreCreatedNotification( - filename=pathlib.Path(store.path).name, - size_mb=store.size_kb // 1024, - performance=entities.PerformanceMetadata( - duration_seconds=monitor.get_runtime(), - memory_mb=max(monitor.memory_buffer) / 1e6, - ), - ), + if isinstance(init_store_result, Failure): + monitor.join() # TODO: Make this a context manager instead + return Failure(OSError( + f"Failed to initialize store for init time: {init_store_result!s}", + )) + store = init_store_result.unwrap() + + amr_result = self.mr.authenticate() + if isinstance(amr_result, Failure): + monitor.join() + store.delete_store() + return Failure(OSError( + "Unable to authenticate with model repository " + f"'{self.mr.repository().name}': " + f"{amr_result.failure()}", + )) + amr = amr_result.unwrap() + + # Create a generator to fetch and process raw data + n_jobs: int = max(cpu_count() - 1, self.mr.repository().max_connections) + if os.getenv("CONCURRENCY", "True").capitalize() == "False": + n_jobs = 1 + log.debug(f"Downloading using {n_jobs} concurrent thread(s)") + fetch_result_generator = Parallel( + n_jobs=n_jobs, + prefer="threads", + return_as="generator_unordered", + )(amr.fetch_init_data(it=it)) + + # Regionally write the results of the generator as they are ready + failed_etls: int = 0 + for fetch_result in fetch_result_generator: + if isinstance(fetch_result, Failure): + log.error( + f"Error fetching data for init time '{it:%Y-%m-%d %H:%M}' " + f"and model {self.mr.repository().name}: {fetch_result.failure()!s}", ) - if isinstance(notify_result, Failure): - return Failure(OSError( - "Failed to notify of store creation: " - f"{notify_result.failure()}", - )) - - return Success(store.path) + failed_etls += 1 + continue + for da in fetch_result.unwrap(): + write_result = store.write_to_region(da) + if isinstance(write_result, Failure): + log.error( + f"Error writing data for init time '{it:%Y-%m-%d %H:%M}' " + f"and model {self.mr.repository().name}: " + f"{write_result.failure()!s}", + ) + failed_etls += 1 + + del fetch_result_generator + # Fail hard if any of the writes failed + # * TODO: Consider just how hard we want to fail in this instance + if failed_etls > 0: + monitor.join() + store.delete_store() + return Failure(OSError( + f"Failed to write {failed_etls} regions " + f"for init time '{it:%Y-%m-%d %H:%M}'. " + "See error logs for details.", + )) + + # Postprocess the dataset as required + # postprocess_result = store.postprocess(self.mr.repository().postprocess_options) + # if isinstance(postprocess_result, Failure): + # monitor.join() # TODO: Make this a context manager instead + # return Failure(postprocess_result.failure()) + + monitor.join() + notify_result = self.nr().notify( + message=entities.StoreCreatedNotification( + filename=pathlib.Path(store.path).name, + size_mb=store.size_kb // 1024, # TODO: 2024-11-19 check this is right + performance=entities.PerformanceMetadata( + duration_seconds=monitor.get_runtime(), + memory_mb=max(monitor.memory_buffer) / 1e6, + ), + ), + ) + if isinstance(notify_result, Failure): + return Failure(OSError( + "Failed to notify of store creation: " + f"{notify_result.failure()}", + )) - case _: - return Failure( - TypeError(f"Unexpected result type: {type(init_store_result)}"), - ) + return Success(store.path) @override def postprocess(self, options: entities.PostProcessOptions) -> ResultE[str]: