From 8213b6a9460bf91b5cea564d9a5783d9e8b9253a Mon Sep 17 00:00:00 2001 From: Jordan Laser Date: Tue, 30 Jan 2024 10:03:23 -0700 Subject: [PATCH] multithread tar --- .../src/forcingprocessor/forcingprocessor.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/forcingprocessor/src/forcingprocessor/forcingprocessor.py b/forcingprocessor/src/forcingprocessor/forcingprocessor.py index 44235d0a..4b27dd0b 100644 --- a/forcingprocessor/src/forcingprocessor/forcingprocessor.py +++ b/forcingprocessor/src/forcingprocessor/forcingprocessor.py @@ -71,6 +71,12 @@ def load_balance(items_per_proc,launch_delay,single_ex, exec_count): Python takes a couple seconds to launch a process so if this script is launched with 10's of processes, it may not be optimal to distribute the work evenly. This function minimizes projected processing time + + items_per_proc : list of length number of processes with each element representing the number of items the process has been assigned + launch_delay : time in seconds it takes python to launch the function + single_ex : time in seconds it takes to process 1 item + exec_count : number of items processed per execution + """ nprocs = len(items_per_proc) nitems = np.sum(items_per_proc) @@ -114,8 +120,8 @@ def multiprocess_data_extract(files,nprocs,crosswalk_dict,fs): Sets up the multiprocessing pool for forcing_grid2catchment and returns the data and time axis ordered in time """ - launch_time = 2 - cycle_time = 60 + launch_time = 2.5 + cycle_time = 48 files_per_cycle = 1 files_per_proc = distribute_work(files,nprocs) files_per_proc = load_balance(files_per_proc,launch_time,cycle_time,files_per_cycle) @@ -215,7 +221,7 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, fs=None): nvar = len(nwm_variables) data_array = np.zeros((nvar,ncatch), dtype=np.float32) jcatch = 0 - for key, value in crosswalk_dict.items(): + for key, value in crosswalk_dict.items(): data_array[:,jcatch] = np.nanmean(data_allvars[:, value[0], value[1]], axis=1) jcatch += 1 data_list.append(data_array) @@ -470,7 +476,7 @@ def prep_ngen_data(conf): for x in msg: print(x, end='') sys.stdout.flush() - time.sleep(0.1) + time.sleep(0.05) print('\n') t_extract = 0 @@ -791,13 +797,10 @@ def prep_ngen_data(conf): combined_tar.add(tmpfile.name, arcname=jfilename) else: path = str(metaf_path) - meta_rel_forcings = "../metadata/forcings_metadata/" - combined_tar_filename_pre = str(bucket_path) + '/forcings.tar.gz' combined_tar_filename = str(forcing_path) + '/forcings.tar.gz' - tar_cmd = f'tar -czf {combined_tar_filename_pre} -C {forcing_path} .' - if ii_collect_stats: tar_cmd += f' -C {meta_rel_forcings} .' + tar_cmd = f'tar cf - {forcing_path} {metaf_path} | pigz > forcings.tar.gz' os.system(tar_cmd) - os.system(f'mv {combined_tar_filename_pre} {combined_tar_filename}') + os.system(f'mv forcings.tar.gz {combined_tar_filename}') tar_time = time.perf_counter() - t0000