Skip to content

Commit

Permalink
fix(archiver): Allow for list of dataarrays (#210)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Dec 6, 2024
1 parent 35b05b1 commit 7ee73cd
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions src/nwp_consumer/internal/services/archiver_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,27 +89,42 @@ def archive(self, year: int, month: int) -> ResultE[str]:
if os.getenv("CONCURRENCY", "True").capitalize() == "False":
n_jobs = 1
log.debug(f"Downloading using {n_jobs} concurrent thread(s)")
da_result_generator = Parallel(
fetch_result_generator = Parallel(
n_jobs=n_jobs,
prefer="threads",
return_as="generator_unordered",
)(amr.fetch_init_data(it=it))

# Regionally write the results of the generator as they are ready
for da_result in da_result_generator:
write_result = da_result.bind(store.write_to_region)
# Fail soft if a region fails to write
if isinstance(write_result, Failure):
log.error(f"Failed to write time {it:%Y-%m-%d %H:%M}: {write_result}")
for fetch_result in fetch_result_generator:
if isinstance(fetch_result, Failure):
log.error(
f"Error fetching data for init time '{it:%Y-%m-%d %H:%M}' "
f"and model {self.mr.model().name}: {fetch_result.failure()!s}",
)
failed_times.append(it)
continue
for da in fetch_result.unwrap():
write_result = store.write_to_region(da)
# Fail soft if a region fails to write
if isinstance(write_result, Failure):
log.error(f"Failed to write time {it:%Y-%m-%d %H:%M}: {write_result}")
failed_times.append(it)

del da_result_generator
del fetch_result_generator

# Add the failed times to the store's metadata
store.update_attrs({
"failed_times": ", ".join([t.strftime("Day %d %H:%M") for t in failed_times]),
})

if len(failed_times) == len(missing_times_result.unwrap()):
store.delete_store()
return Failure(OSError(
"Failed to write any regions for all init times. "
"Check error logs for details.",
))

# Postprocess the dataset as required
# postprocess_result = store.postprocess(self._mr.metadata().postprocess_options)
# if isinstance(postprocess_result, Failure):
Expand Down

0 comments on commit 7ee73cd

Please sign in to comment.