From 7ee73cd0cfef42c9741436189f62ffdc3667cf95 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:25:41 +0000 Subject: [PATCH] fix(archiver): Allow for list of dataarrays (#210) --- .../internal/services/archiver_service.py | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/nwp_consumer/internal/services/archiver_service.py b/src/nwp_consumer/internal/services/archiver_service.py index 94901f1b..08b43e22 100644 --- a/src/nwp_consumer/internal/services/archiver_service.py +++ b/src/nwp_consumer/internal/services/archiver_service.py @@ -89,27 +89,42 @@ def archive(self, year: int, month: int) -> ResultE[str]: if os.getenv("CONCURRENCY", "True").capitalize() == "False": n_jobs = 1 log.debug(f"Downloading using {n_jobs} concurrent thread(s)") - da_result_generator = Parallel( + fetch_result_generator = Parallel( n_jobs=n_jobs, prefer="threads", return_as="generator_unordered", )(amr.fetch_init_data(it=it)) # Regionally write the results of the generator as they are ready - for da_result in da_result_generator: - write_result = da_result.bind(store.write_to_region) - # Fail soft if a region fails to write - if isinstance(write_result, Failure): - log.error(f"Failed to write time {it:%Y-%m-%d %H:%M}: {write_result}") + for fetch_result in fetch_result_generator: + if isinstance(fetch_result, Failure): + log.error( + f"Error fetching data for init time '{it:%Y-%m-%d %H:%M}' " + f"and model {self.mr.model().name}: {fetch_result.failure()!s}", + ) failed_times.append(it) + continue + for da in fetch_result.unwrap(): + write_result = store.write_to_region(da) + # Fail soft if a region fails to write + if isinstance(write_result, Failure): + log.error(f"Failed to write time {it:%Y-%m-%d %H:%M}: {write_result}") + failed_times.append(it) - del da_result_generator + del fetch_result_generator # Add the failed times to the store's metadata store.update_attrs({ "failed_times": ", ".join([t.strftime("Day %d %H:%M") for t in failed_times]), }) + if len(failed_times) == len(missing_times_result.unwrap()): + store.delete_store() + return Failure(OSError( + "Failed to write any regions for all init times. " + "Check error logs for details.", + )) + # Postprocess the dataset as required # postprocess_result = store.postprocess(self._mr.metadata().postprocess_options) # if isinstance(postprocess_result, Failure):