From ef1f4a5d76dbcbb5102183fbc6ace2948ec8d553 Mon Sep 17 00:00:00 2001 From: Jordan Laser Date: Fri, 5 Jan 2024 12:13:10 -0700 Subject: [PATCH] fix tar command --- .../src/forcingprocessor/forcingprocessor.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/forcingprocessor/src/forcingprocessor/forcingprocessor.py b/forcingprocessor/src/forcingprocessor/forcingprocessor.py index 590ddda9..bcab5506 100644 --- a/forcingprocessor/src/forcingprocessor/forcingprocessor.py +++ b/forcingprocessor/src/forcingprocessor/forcingprocessor.py @@ -300,6 +300,8 @@ def multiprocess_write(data,t_ax,catchments,nprocs,output_bucket,out_path,ii_app dfs.append(results[1]) filenames.append(results[2]) + print(f'Gathering data from write processes...') + if len(ids) > 1: flat_ids = [item for sublist in ids for item in sublist] flat_dfs = [item for sublist in dfs for item in sublist] @@ -352,14 +354,15 @@ def write_data( key = (out_path/f"cat-{cat_id}.csv").resolve() df_bucket = pd.read_csv(s3_client.get_object(Bucket = bucket, Key = key).get("Body")) df = pd.concat([df_bucket,df]) - del df_bucket - - dfs.append(df) + del df_bucket if storage_type.lower() == 's3': buf = BytesIO() filename = f"cat-{cat_id}." + output_file_type + dfs.append(df) + filenames.append(str(Path(filename).name)) + if output_file_type == "parquet": df.to_parquet(buf, index=False) elif output_file_type == "csv": @@ -377,9 +380,7 @@ def write_data( if output_file_type == "parquet": df.to_parquet(filename, index=False) elif output_file_type == "csv": - df.to_csv(filename, index=False) - - filenames.append(str(Path(filename).name)) + df.to_csv(filename, index=False) if j == 0: if storage_type.lower() == 's3': @@ -789,11 +790,12 @@ def prep_ngen_data(conf): combined_tar.add(tmpfile.name, arcname=jfilename) else: - del dfs + del dfs, filenames, forcing_cat_ids path = str(metaf_path) + meta_rel_forcings = forcing_path / ".." / "/metadata/forcings_metadata/" combined_tar_filename = str(forcing_path) + '/forcings.tar.gz' - tar_cmd = f'tar -czvf {combined_tar_filename} -C {forcing_path} .' - if ii_collect_stats: tar_cmd += f' -C {metaf_path} .' + tar_cmd = f'tar -czf {combined_tar_filename} -C {forcing_path} .' + if ii_collect_stats: tar_cmd += f' -C {meta_rel_forcings} .' os.system(tar_cmd) tar_time = time.perf_counter() - t0000