From 85cabaef2ab6c115cc15ad47a3733e90af73a72a Mon Sep 17 00:00:00 2001 From: Simon Adamov Date: Wed, 27 Mar 2024 12:06:12 +0100 Subject: [PATCH] bugfixes, better communication protocol (hsn0) more workers :) --- create_zarr_archive.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/create_zarr_archive.py b/create_zarr_archive.py index 85f20844..bee18f2e 100644 --- a/create_zarr_archive.py +++ b/create_zarr_archive.py @@ -1,6 +1,7 @@ # Standard library import argparse import os +from math import sqrt # Third-party import xarray as xr @@ -55,7 +56,7 @@ def process_grib_files(data_in, selected_vars, indexpath): coords="minimal", compat="override", ) - datasets.append(ds[vars]) + datasets.append(ds[variables]) return xr.merge(datasets, compat="minimal") @@ -69,7 +70,7 @@ def main(data_in, data_out, indexpath, selected_vars, selected_vars_2): if root.endswith("det") and f.startswith("laf") ] - print("Processing training data_out") + print("Processing Data") ds = xr.merge( [ process_grib_files(all_files, selected_vars, indexpath), @@ -80,7 +81,11 @@ def main(data_in, data_out, indexpath, selected_vars, selected_vars_2): print(f"Saving Zarr to {data_out}") with ProgressBar(): - ds = ds.assign_coords(x=ds.x, y=ds.y).drop_vars(["valid_time", "step"]) + ds = ( + ds.assign_coords(x=ds.x, y=ds.y) + .drop_vars(["valid_time", "step"]) + .chunk({"time": 1, "level": 1}) + ) ds.to_zarr( data_out, consolidated=True, @@ -111,6 +116,7 @@ def main(data_in, data_out, indexpath, selected_vars, selected_vars_2): default="/scratch/mch/sadamov/temp", help="Path to the index file.", ) + args = parser.parse_args() SELECTED_VARS = { "heightAboveGround": ["U_10M", "V_10M"], @@ -123,26 +129,28 @@ def main(data_in, data_out, indexpath, selected_vars, selected_vars_2): # U,V have different lat/lon. T_2M has different heightAboveGround "shortName": ["T_2M", "U", "V"], } - - args = parser.parse_args() + JOBS = 4 + CORES = 256 + processes = int(sqrt(CORES)) + workers = JOBS * processes cluster = SLURMCluster( queue="postproc", account="s83", - processes=4, - cores=64, - memory="111GB", + processes=16, + cores=CORES, + memory="444GB", local_directory="/scratch/mch/sadamov/temp", shared_temp_directory="/scratch/mch/sadamov/temp", log_directory="lightning_logs", shebang="#!/bin/bash", - interface="nmn0", + interface="hsn0", walltime="5-00:00:00", job_extra_directives=["--exclusive"], ) - cluster.scale(jobs=2) + cluster.scale(jobs=JOBS) client = Client(cluster) - client.wait_for_workers(2) + client.wait_for_workers(workers) main( args.data_in,