Skip to content

Commit

Permalink
multithread tar
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Jan 30, 2024
1 parent 4cf4a41 commit 8213b6a
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions forcingprocessor/src/forcingprocessor/forcingprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def load_balance(items_per_proc,launch_delay,single_ex, exec_count):
Python takes a couple seconds to launch a process so if this script is launched with 10's
of processes, it may not be optimal to distribute the work evenly.
This function minimizes projected processing time
items_per_proc : list of length number of processes with each element representing the number of items the process has been assigned
launch_delay : time in seconds it takes python to launch the function
single_ex : time in seconds it takes to process 1 item
exec_count : number of items processed per execution
"""
nprocs = len(items_per_proc)
nitems = np.sum(items_per_proc)
Expand Down Expand Up @@ -114,8 +120,8 @@ def multiprocess_data_extract(files,nprocs,crosswalk_dict,fs):
Sets up the multiprocessing pool for forcing_grid2catchment and returns the data and time axis ordered in time
"""
launch_time = 2
cycle_time = 60
launch_time = 2.5
cycle_time = 48
files_per_cycle = 1
files_per_proc = distribute_work(files,nprocs)
files_per_proc = load_balance(files_per_proc,launch_time,cycle_time,files_per_cycle)
Expand Down Expand Up @@ -215,7 +221,7 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, fs=None):
nvar = len(nwm_variables)
data_array = np.zeros((nvar,ncatch), dtype=np.float32)
jcatch = 0
for key, value in crosswalk_dict.items():
for key, value in crosswalk_dict.items():
data_array[:,jcatch] = np.nanmean(data_allvars[:, value[0], value[1]], axis=1)
jcatch += 1
data_list.append(data_array)
Expand Down Expand Up @@ -470,7 +476,7 @@ def prep_ngen_data(conf):
for x in msg:
print(x, end='')
sys.stdout.flush()
time.sleep(0.1)
time.sleep(0.05)
print('\n')

t_extract = 0
Expand Down Expand Up @@ -791,13 +797,10 @@ def prep_ngen_data(conf):
combined_tar.add(tmpfile.name, arcname=jfilename)
else:
path = str(metaf_path)
meta_rel_forcings = "../metadata/forcings_metadata/"
combined_tar_filename_pre = str(bucket_path) + '/forcings.tar.gz'
combined_tar_filename = str(forcing_path) + '/forcings.tar.gz'
tar_cmd = f'tar -czf {combined_tar_filename_pre} -C {forcing_path} .'
if ii_collect_stats: tar_cmd += f' -C {meta_rel_forcings} .'
tar_cmd = f'tar cf - {forcing_path} {metaf_path} | pigz > forcings.tar.gz'
os.system(tar_cmd)
os.system(f'mv {combined_tar_filename_pre} {combined_tar_filename}')
os.system(f'mv forcings.tar.gz {combined_tar_filename}')

tar_time = time.perf_counter() - t0000

Expand Down

0 comments on commit 8213b6a

Please sign in to comment.