Skip to content

Commit

Permalink
splitting up command
Browse files Browse the repository at this point in the history
  • Loading branch information
peterdudfield committed Oct 18, 2024
1 parent a6d0010 commit 575f09b
Showing 1 changed file with 6 additions and 5 deletions.
11 changes: 6 additions & 5 deletions src/nwp_consumer/internal/service/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,12 @@ def CreateLatestZarr(self) -> list[pathlib.Path]:
# * Then cache the dataset as a zarr file and store it in the store
bag: dask.bag.Bag = dask.bag.from_sequence(cachedPaths)
log.info('Made dask bag')
latestDataset = (
bag.map(lambda tfp: self.fetcher.mapCachedRaw(p=tfp))
.fold(lambda ds1, ds2: _mergeDatasets([ds1, ds2]))
.compute()
)
latestDataset = bag.map(lambda tfp: self.fetcher.mapCachedRaw(p=tfp))
log.info('Doing fold')
latestDataset = latestDataset.fold(lambda ds1, ds2: _mergeDatasets([ds1, ds2]))
log.info('Doing compute')
latestDataset = latestDataset.compute()

log.info('Made latest Dataset')
if not _dataQualityFilter(ds=latestDataset):
return []
Expand Down

0 comments on commit 575f09b

Please sign in to comment.