diff --git a/forcingprocessor/src/forcingprocessor/forcingprocessor.py b/forcingprocessor/src/forcingprocessor/forcingprocessor.py index b5893f5e..ce6b05ad 100644 --- a/forcingprocessor/src/forcingprocessor/forcingprocessor.py +++ b/forcingprocessor/src/forcingprocessor/forcingprocessor.py @@ -455,6 +455,7 @@ def prep_ngen_data(conf): global ii_verbose ii_verbose = conf["run"].get("verbose",False) ii_collect_stats = conf["run"].get("collect_stats",True) + ii_tar = conf["run"].get("ii_tar",True) proc_threads = conf["run"].get("proc_threads",None) write_threads = conf["run"].get("write_threads",None) nfile_chunk = conf["run"].get("nfile_chunk",None) @@ -579,7 +580,7 @@ def prep_ngen_data(conf): jnwm_files = nwm_forcing_files[start:end] t0 = time.perf_counter() if ii_verbose: print(f'Entering data extraction...\n') - # [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, var_list, fs) + # [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, fs) data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_threads,crosswalk_dict,fs) t_extract = time.perf_counter() - t0 complexity = (nfiles_tot * ncatchments) / 10000 @@ -744,48 +745,50 @@ def prep_ngen_data(conf): meta_time = time.perf_counter() - t000 - if ii_verbose: print(f'\nWriting tarball...') - - if storage_type.lower() == 's3': - path = "/metadata/forcings_metadata/" - combined_tar_filename = 'forcings.tar.gz' - else: - path = str(metaf_path) - combined_tar_filename = str(forcing_path) + '/forcings.tar.gz' - with tarfile.open(combined_tar_filename, 'w:gz') as combined_tar: - if ii_collect_stats: - buf = BytesIO() - - filename = f"metadata." + output_file_type - metadata_df.to_csv(buf, index=False) - buf.seek(0) - tarinfo = tarfile.TarInfo(name=path + filename) - tarinfo.size = len(buf.getvalue()) - combined_tar.addfile(tarinfo, fileobj=buf) + if ii_tar: + if ii_verbose: print(f'\nWriting tarball...') + t0000 = time.perf_counter() + if storage_type.lower() == 's3': + path = "/metadata/forcings_metadata/" + combined_tar_filename = 'forcings.tar.gz' + else: + path = str(metaf_path) + combined_tar_filename = str(forcing_path) + '/forcings.tar.gz' + with tarfile.open(combined_tar_filename, 'w:gz') as combined_tar: + if ii_collect_stats: + buf = BytesIO() + + filename = f"metadata." + output_file_type + metadata_df.to_csv(buf, index=False) + buf.seek(0) + tarinfo = tarfile.TarInfo(name=path + filename) + tarinfo.size = len(buf.getvalue()) + combined_tar.addfile(tarinfo, fileobj=buf) - filename = f"catchments_avg." + output_file_type - avg_df.to_csv(buf, index=False) - buf.seek(0) - tarinfo = tarfile.TarInfo(name=path + filename) - tarinfo.size = len(buf.getvalue()) - combined_tar.addfile(tarinfo, fileobj=buf) + filename = f"catchments_avg." + output_file_type + avg_df.to_csv(buf, index=False) + buf.seek(0) + tarinfo = tarfile.TarInfo(name=path + filename) + tarinfo.size = len(buf.getvalue()) + combined_tar.addfile(tarinfo, fileobj=buf) - filename = f"catchments_median." + output_file_type - med_df.to_csv(buf, index=False) - buf.seek(0) - tarinfo = tarfile.TarInfo(name=path + filename) - tarinfo.size = len(buf.getvalue()) - combined_tar.addfile(tarinfo, fileobj=buf) - - for j, jdf in enumerate(dfs): - jfilename = filenames[j] - with tempfile.NamedTemporaryFile() as tmpfile: - if output_file_type == "parquet": - jdf.to_parquet(tmpfile.name, index=False) - elif output_file_type == "csv": - jdf.to_csv(tmpfile.name, index=False) - - combined_tar.add(tmpfile.name, arcname=jfilename) + filename = f"catchments_median." + output_file_type + med_df.to_csv(buf, index=False) + buf.seek(0) + tarinfo = tarfile.TarInfo(name=path + filename) + tarinfo.size = len(buf.getvalue()) + combined_tar.addfile(tarinfo, fileobj=buf) + + for j, jdf in enumerate(dfs): + jfilename = filenames[j] + with tempfile.NamedTemporaryFile() as tmpfile: + if output_file_type == "parquet": + jdf.to_parquet(tmpfile.name, index=False) + elif output_file_type == "csv": + jdf.to_csv(tmpfile.name, index=False) + + combined_tar.add(tmpfile.name, arcname=jfilename) + tar_time = time.perf_counter() - t0000 if storage_type == 'S3': with open(combined_tar_filename, 'rb') as combined_tar: @@ -802,6 +805,9 @@ def prep_ngen_data(conf): if ii_collect_stats: runtime += meta_time msg += f"\nCollect stats : {meta_time:.2f}s" + if ii_tar: + runtime += tar_time + msg += f"\nWrite tar : {tar_time:.2f}s" msg += f"\nRuntime : {runtime:.2f}s\n" print(msg)