diff --git a/modules/data_processing/forcings.py b/modules/data_processing/forcings.py index 29a9b50..ccc9ff7 100644 --- a/modules/data_processing/forcings.py +++ b/modules/data_processing/forcings.py @@ -223,7 +223,12 @@ def compute_zonal_stats( concatenated_da.to_dataset(name=variable).to_netcdf(forcings_dir/ "temp" / f"{variable}_{i}.nc") # Merge the chunks back together datasets = [xr.open_dataset(forcings_dir / "temp" / f"{variable}_{i}.nc") for i in range(len(time_chunks))] - xr.concat(datasets, dim="time").to_netcdf(forcings_dir / f"{variable}.nc") + result = xr.concat(datasets, dim="time") + result.to_netcdf(forcings_dir / f"{variable}.nc") + # close the datasets + result.close() + _ = [dataset.close() for dataset in datasets] + for file in forcings_dir.glob("temp/*.nc"): file.unlink() progress.remove_task(chunk_task) @@ -237,6 +242,7 @@ def compute_zonal_stats( ) write_outputs(forcings_dir, variables) + def write_outputs(forcings_dir, variables): # start a dask cluster if there isn't one already running @@ -289,9 +295,9 @@ def write_outputs(forcings_dir, variables): final_ds["Time"].attrs["epoch_start"] = "01/01/1970 00:00:00" # not needed but suppresses the ngen warning final_ds.to_netcdf(output_folder / "forcings.nc", engine="netcdf4") - # delete the individual variable files - for file in forcings_dir.glob("*.nc"): - file.unlink() + # close the datasets + _ = [result.close() for result in results] + final_ds.close() def setup_directories(cat_id: str) -> file_paths: