Skip to content

Commit

Permalink
make tar optional
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Jan 4, 2024
1 parent 960b185 commit 711c3b5
Showing 1 changed file with 47 additions and 41 deletions.
88 changes: 47 additions & 41 deletions forcingprocessor/src/forcingprocessor/forcingprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ def prep_ngen_data(conf):
global ii_verbose
ii_verbose = conf["run"].get("verbose",False)
ii_collect_stats = conf["run"].get("collect_stats",True)
ii_tar = conf["run"].get("ii_tar",True)
proc_threads = conf["run"].get("proc_threads",None)
write_threads = conf["run"].get("write_threads",None)
nfile_chunk = conf["run"].get("nfile_chunk",None)
Expand Down Expand Up @@ -579,7 +580,7 @@ def prep_ngen_data(conf):
jnwm_files = nwm_forcing_files[start:end]
t0 = time.perf_counter()
if ii_verbose: print(f'Entering data extraction...\n')
# [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, var_list, fs)
# [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, fs)
data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_threads,crosswalk_dict,fs)
t_extract = time.perf_counter() - t0
complexity = (nfiles_tot * ncatchments) / 10000
Expand Down Expand Up @@ -744,48 +745,50 @@ def prep_ngen_data(conf):

meta_time = time.perf_counter() - t000

if ii_verbose: print(f'\nWriting tarball...')

if storage_type.lower() == 's3':
path = "/metadata/forcings_metadata/"
combined_tar_filename = 'forcings.tar.gz'
else:
path = str(metaf_path)
combined_tar_filename = str(forcing_path) + '/forcings.tar.gz'
with tarfile.open(combined_tar_filename, 'w:gz') as combined_tar:
if ii_collect_stats:
buf = BytesIO()

filename = f"metadata." + output_file_type
metadata_df.to_csv(buf, index=False)
buf.seek(0)
tarinfo = tarfile.TarInfo(name=path + filename)
tarinfo.size = len(buf.getvalue())
combined_tar.addfile(tarinfo, fileobj=buf)
if ii_tar:
if ii_verbose: print(f'\nWriting tarball...')
t0000 = time.perf_counter()
if storage_type.lower() == 's3':
path = "/metadata/forcings_metadata/"
combined_tar_filename = 'forcings.tar.gz'
else:
path = str(metaf_path)
combined_tar_filename = str(forcing_path) + '/forcings.tar.gz'
with tarfile.open(combined_tar_filename, 'w:gz') as combined_tar:
if ii_collect_stats:
buf = BytesIO()

filename = f"metadata." + output_file_type
metadata_df.to_csv(buf, index=False)
buf.seek(0)
tarinfo = tarfile.TarInfo(name=path + filename)
tarinfo.size = len(buf.getvalue())
combined_tar.addfile(tarinfo, fileobj=buf)

filename = f"catchments_avg." + output_file_type
avg_df.to_csv(buf, index=False)
buf.seek(0)
tarinfo = tarfile.TarInfo(name=path + filename)
tarinfo.size = len(buf.getvalue())
combined_tar.addfile(tarinfo, fileobj=buf)
filename = f"catchments_avg." + output_file_type
avg_df.to_csv(buf, index=False)
buf.seek(0)
tarinfo = tarfile.TarInfo(name=path + filename)
tarinfo.size = len(buf.getvalue())
combined_tar.addfile(tarinfo, fileobj=buf)

filename = f"catchments_median." + output_file_type
med_df.to_csv(buf, index=False)
buf.seek(0)
tarinfo = tarfile.TarInfo(name=path + filename)
tarinfo.size = len(buf.getvalue())
combined_tar.addfile(tarinfo, fileobj=buf)

for j, jdf in enumerate(dfs):
jfilename = filenames[j]
with tempfile.NamedTemporaryFile() as tmpfile:
if output_file_type == "parquet":
jdf.to_parquet(tmpfile.name, index=False)
elif output_file_type == "csv":
jdf.to_csv(tmpfile.name, index=False)

combined_tar.add(tmpfile.name, arcname=jfilename)
filename = f"catchments_median." + output_file_type
med_df.to_csv(buf, index=False)
buf.seek(0)
tarinfo = tarfile.TarInfo(name=path + filename)
tarinfo.size = len(buf.getvalue())
combined_tar.addfile(tarinfo, fileobj=buf)

for j, jdf in enumerate(dfs):
jfilename = filenames[j]
with tempfile.NamedTemporaryFile() as tmpfile:
if output_file_type == "parquet":
jdf.to_parquet(tmpfile.name, index=False)
elif output_file_type == "csv":
jdf.to_csv(tmpfile.name, index=False)

combined_tar.add(tmpfile.name, arcname=jfilename)
tar_time = time.perf_counter() - t0000

if storage_type == 'S3':
with open(combined_tar_filename, 'rb') as combined_tar:
Expand All @@ -802,6 +805,9 @@ def prep_ngen_data(conf):
if ii_collect_stats:
runtime += meta_time
msg += f"\nCollect stats : {meta_time:.2f}s"
if ii_tar:
runtime += tar_time
msg += f"\nWrite tar : {tar_time:.2f}s"
msg += f"\nRuntime : {runtime:.2f}s\n"
print(msg)

Expand Down

0 comments on commit 711c3b5

Please sign in to comment.