diff --git a/forcingprocessor/README.md b/forcingprocessor/README.md index 503f0dc0..87533587 100644 --- a/forcingprocessor/README.md +++ b/forcingprocessor/README.md @@ -59,8 +59,8 @@ See the docker README for example run commands from the container. |-------------------|--------------------------------|----------| | verbose | Get print statements, defaults to false | :white_check_mark: | | collect_stats | Collect forcing metadata, defaults to true | :white_check_mark: | -| proc_threads | Number of data processing threads, defaults to 80% available cores | | -| write_threads | Number of writing threads, defaults to 100% available cores | | +| proc_process | Number of data processing threads, defaults to 80% available cores | | +| write_process | Number of writing threads, defaults to 100% available cores | | | nfile_chunk | Number of files to process each write, defaults to 1000000. Only set this if experiencing memory constraints due to large number of nwm forcing files | | ## nwm_file diff --git a/forcingprocessor/src/forcingprocessor/forcingprocessor.py b/forcingprocessor/src/forcingprocessor/forcingprocessor.py index 81625833..06ec92cb 100644 --- a/forcingprocessor/src/forcingprocessor/forcingprocessor.py +++ b/forcingprocessor/src/forcingprocessor/forcingprocessor.py @@ -457,12 +457,12 @@ def prep_ngen_data(conf): ii_verbose = conf["run"].get("verbose",False) ii_collect_stats = conf["run"].get("collect_stats",True) ii_tar = conf["run"].get("ii_tar",True) - proc_processs = conf["run"].get("proc_processs",None) - write_processs = conf["run"].get("write_processs",None) + proc_process = conf["run"].get("proc_process",None) + write_process = conf["run"].get("write_process",None) nfile_chunk = conf["run"].get("nfile_chunk",None) - if proc_processs is None: proc_processs = int(os.cpu_count() * 0.8) - if write_processs is None: write_processs = os.cpu_count() + if proc_process is None: proc_process = int(os.cpu_count() * 0.8) + if write_process is None: write_process = os.cpu_count() if nfile_chunk is None: nfile_chunk = 100000 if ii_verbose: @@ -582,22 +582,22 @@ def prep_ngen_data(conf): t0 = time.perf_counter() if ii_verbose: print(f'Entering data extraction...\n') # [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, fs) - data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_processs,crosswalk_dict,fs) + data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_process,crosswalk_dict,fs) t_extract = time.perf_counter() - t0 complexity = (nfiles_tot * ncatchments) / 10000 score = complexity / t_extract - if ii_verbose: print(f'Data extract processs: {proc_processs:.2f}\nExtract time: {t_extract:.2f}\nComplexity: {complexity:.2f}\nScore: {score:.2f}\n', end=None) + if ii_verbose: print(f'Data extract processs: {proc_process:.2f}\nExtract time: {t_extract:.2f}\nComplexity: {complexity:.2f}\nScore: {score:.2f}\n', end=None) t0 = time.perf_counter() out_path = (output_path/'forcings/').resolve() if ii_verbose: print(f'Writing catchment forcings to {output_bucket} at {out_path}!', end=None) - forcing_cat_ids, dfs, filenames = multiprocess_write(data_array,t_ax,crosswalk_dict.keys(),write_processs,output_bucket,out_path,ii_append) + forcing_cat_ids, dfs, filenames = multiprocess_write(data_array,t_ax,crosswalk_dict.keys(),write_process,output_bucket,out_path,ii_append) ii_append = True write_time += time.perf_counter() - t0 write_rate = ncatchments / write_time - if ii_verbose: print(f'\n\nWrite processs: {write_processs}\nWrite time: {write_time:.2f}\nWrite rate {write_rate:.2f} files/second\n', end=None) + if ii_verbose: print(f'\n\nWrite processs: {write_process}\nWrite time: {write_time:.2f}\nWrite rate {write_rate:.2f} files/second\n', end=None) loop_time = time.perf_counter() - t00 if ii_verbose and nloops > 1: print(f'One loop took {loop_time:.2f} seconds. Estimated time to completion: {loop_time * (nloops - jloop):.2f}')