Skip to content

Commit

Permalink
bugfixes,
Browse files Browse the repository at this point in the history
better communication protocol (hsn0)
more workers :)
  • Loading branch information
sadamov committed Mar 27, 2024
1 parent db1b0d1 commit 85cabae
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions create_zarr_archive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Standard library
import argparse
import os
from math import sqrt

# Third-party
import xarray as xr
Expand Down Expand Up @@ -55,7 +56,7 @@ def process_grib_files(data_in, selected_vars, indexpath):
coords="minimal",
compat="override",
)
datasets.append(ds[vars])
datasets.append(ds[variables])
return xr.merge(datasets, compat="minimal")


Expand All @@ -69,7 +70,7 @@ def main(data_in, data_out, indexpath, selected_vars, selected_vars_2):
if root.endswith("det") and f.startswith("laf")
]

print("Processing training data_out")
print("Processing Data")
ds = xr.merge(
[
process_grib_files(all_files, selected_vars, indexpath),
Expand All @@ -80,7 +81,11 @@ def main(data_in, data_out, indexpath, selected_vars, selected_vars_2):

print(f"Saving Zarr to {data_out}")
with ProgressBar():
ds = ds.assign_coords(x=ds.x, y=ds.y).drop_vars(["valid_time", "step"])
ds = (
ds.assign_coords(x=ds.x, y=ds.y)
.drop_vars(["valid_time", "step"])
.chunk({"time": 1, "level": 1})
)
ds.to_zarr(
data_out,
consolidated=True,
Expand Down Expand Up @@ -111,6 +116,7 @@ def main(data_in, data_out, indexpath, selected_vars, selected_vars_2):
default="/scratch/mch/sadamov/temp",
help="Path to the index file.",
)
args = parser.parse_args()

SELECTED_VARS = {
"heightAboveGround": ["U_10M", "V_10M"],
Expand All @@ -123,26 +129,28 @@ def main(data_in, data_out, indexpath, selected_vars, selected_vars_2):
# U,V have different lat/lon. T_2M has different heightAboveGround
"shortName": ["T_2M", "U", "V"],
}

args = parser.parse_args()
JOBS = 4
CORES = 256
processes = int(sqrt(CORES))
workers = JOBS * processes

cluster = SLURMCluster(
queue="postproc",
account="s83",
processes=4,
cores=64,
memory="111GB",
processes=16,
cores=CORES,
memory="444GB",
local_directory="/scratch/mch/sadamov/temp",
shared_temp_directory="/scratch/mch/sadamov/temp",
log_directory="lightning_logs",
shebang="#!/bin/bash",
interface="nmn0",
interface="hsn0",
walltime="5-00:00:00",
job_extra_directives=["--exclusive"],
)
cluster.scale(jobs=2)
cluster.scale(jobs=JOBS)
client = Client(cluster)
client.wait_for_workers(2)
client.wait_for_workers(workers)

main(
args.data_in,
Expand Down

0 comments on commit 85cabae

Please sign in to comment.