diff --git a/forcingprocessor/src/forcingprocessor/processor.py b/forcingprocessor/src/forcingprocessor/processor.py index fda54824..fda17b88 100644 --- a/forcingprocessor/src/forcingprocessor/processor.py +++ b/forcingprocessor/src/forcingprocessor/processor.py @@ -584,7 +584,6 @@ def write_netcdf(data, vpu, t_ax, catchments): if storage_type == 's3': bucket, key = convert_url2key(nc_filename,s3_client) - key = key[:-1] with tempfile.NamedTemporaryFile(suffix='.nc') as tmpfile: with nc.Dataset(tmpfile, 'w', format='NETCDF4') as ds: catchment_dim = ds.createDimension('catchment-id', len(catchments)) @@ -763,7 +762,7 @@ def prep_ngen_data(conf): meta_path = bucket_path + '/metadata' metaf_path = bucket_path + '/metadata/forcings_metadata' bucket, key = convert_url2key(metaf_path,storage_type) - conf_path = f"{key}conf_fp.json" + conf_path = f"{key}/conf_fp.json" filenamelist_path = f"{key}{os.path.basename(nwm_file)}" s3 = boto3.client("s3") s3.put_object( @@ -934,7 +933,7 @@ def prep_ngen_data(conf): if storage_type == 's3': bucket, key = convert_url2key(output_path,storage_type) - meta_path = f"{key}metadata/forcings_metadata/" + meta_path = f"{key}/metadata/forcings_metadata/" buf = BytesIO() if "parquet" in output_file_type: filename = f"metadata.parquet" @@ -996,7 +995,7 @@ def prep_ngen_data(conf): if storage_type == "s3": bucket, key = convert_url2key(metaf_path,storage_type) - log_path = key + 'profile_fp.txt' + log_path = key + '/profile_fp.txt' s3.upload_file( f'./profile_fp.txt', bucket, diff --git a/forcingprocessor/src/forcingprocessor/utils.py b/forcingprocessor/src/forcingprocessor/utils.py index ddd50fe2..2767f5da 100644 --- a/forcingprocessor/src/forcingprocessor/utils.py +++ b/forcingprocessor/src/forcingprocessor/utils.py @@ -69,7 +69,13 @@ def convert_url2key(nwm_file,fs_type): _nc_file_parts = nwm_file.split('/') layers = _nc_file_parts[3:] for jlay in layers: - bucket_key += jlay + "/" - - bucket = _nc_file_parts[2] + if jlay == layers[-1]: + bucket_key += jlay + else: + bucket_key += jlay + "/" + if fs_type == "google": + bucket = _nc_file_parts[3] + elif fs_type == 's3': + bucket = _nc_file_parts[2] + return bucket, bucket_key \ No newline at end of file diff --git a/forcingprocessor/tests/test_forcingprocessor.py b/forcingprocessor/tests/test_forcingprocessor.py index 2f304c18..d7590134 100644 --- a/forcingprocessor/tests/test_forcingprocessor.py +++ b/forcingprocessor/tests/test_forcingprocessor.py @@ -226,5 +226,6 @@ def test_muliple_weights(): conf['forcing']['gpkg_file'] = ["https://lynker-spatial.s3-us-west-2.amazonaws.com/hydrofabric/v2.1.1/nextgen/conus_forcing-weights/vpuid%3D01/part-0.parquet","https://lynker-spatial.s3-us-west-2.amazonaws.com/hydrofabric/v2.1.1/nextgen/conus_forcing-weights/vpuid%3D02/part-0.parquet"] generate_nwmfiles(nwmurl_conf_retro) prep_ngen_data(conf) + os.system(f"rm -rf {data_dir}") diff --git a/forcingprocessor/tests/test_hf2ds.py b/forcingprocessor/tests/test_hf2ds.py index 3e27630b..08e0ff9d 100644 --- a/forcingprocessor/tests/test_hf2ds.py +++ b/forcingprocessor/tests/test_hf2ds.py @@ -5,15 +5,12 @@ HF_VERSION="v2.1.1" test_dir = Path(__file__).parent data_dir = (test_dir/'data').resolve() -parq_name = "09_weights.parquet" -parq_path = os.path.join(data_dir,parq_name) -os.system(f"curl -o {parq_path} -L -O https://lynker-spatial.s3-us-west-2.amazonaws.com/hydrofabric/{HF_VERSION}/nextgen/conus_forcing-weights/vpuid%3D09/part-0.parquet") -geopackage_name = "palisade.gpkg" -gpkg_path = os.path.join(data_dir,geopackage_name) -os.system(f"curl -o {gpkg_path} -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/{geopackage_name}") out_parq = os.path.join(data_dir,"out.parquet") def test_parquet(): + parq_name = "09_weights.parquet" + parq_path = os.path.join(data_dir,parq_name) + os.system(f"curl -o {parq_path} -L -O https://lynker-spatial.s3-us-west-2.amazonaws.com/hydrofabric/{HF_VERSION}/nextgen/conus_forcing-weights/vpuid%3D09/part-0.parquet") weights,_ = hf2ds([parq_path]) assert len(weights) > 0 @@ -22,6 +19,9 @@ def test_parquet_lynker_spatial(): assert len(weights) > 0 def test_gpkg(): + geopackage_name = "palisade.gpkg" + gpkg_path = os.path.join(data_dir,geopackage_name) + os.system(f"curl -o {gpkg_path} -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/{geopackage_name}") weights,_ = hf2ds([gpkg_path]) assert len(weights) > 0 diff --git a/forcingprocessor/tests/test_plotter.py b/forcingprocessor/tests/test_plotter.py index dbc865f1..74cd6fb4 100644 --- a/forcingprocessor/tests/test_plotter.py +++ b/forcingprocessor/tests/test_plotter.py @@ -6,21 +6,22 @@ from forcingprocessor.plot_forcings import nc_to_3darray, plot_ngen_forcings, csvs_to_3darray, get_nwm_data_array from forcingprocessor.nwm_filenames_generator import generate_nwmfiles import requests +import pytest date = datetime.now() date = date.strftime('%Y%m%d') hourminute = '0000' test_dir = Path(__file__).parent -data_dir = (test_dir/'data').resolve() +DATA_DIR = (test_dir/'data').resolve() pwd = Path.cwd() filenamelist = str((pwd/"filenamelist.txt").resolve()) -geopackage = str(f"{data_dir}/palisade.gpkg") -if os.path.exists(data_dir): - os.system(f"rm -rf {data_dir}") -os.system(f"mkdir {data_dir}") +geopackage = str(f"{DATA_DIR}/palisade.gpkg") +if os.path.exists(DATA_DIR): + os.system(f"rm -rf {DATA_DIR}") +os.system(f"mkdir {DATA_DIR}") geopackage_name = "palisade.gpkg" -os.system(f"curl -o {os.path.join(data_dir,geopackage_name)} -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/palisade.gpkg") +os.system(f"curl -o {os.path.join(DATA_DIR,geopackage_name)} -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/palisade.gpkg") conf = { "forcing" : { @@ -29,7 +30,7 @@ }, "storage":{ - "output_path" : str(data_dir), + "output_path" : str(DATA_DIR), "output_file_type" : ["netcdf","csv"] }, @@ -63,6 +64,12 @@ "lead_time" : [1] } +@pytest.fixture +def clean_dir(autouse=True): + if os.path.exists(DATA_DIR): + os.system(f'rm -rf {str(DATA_DIR)}') + os.system(f'mkdir {str(DATA_DIR)}') + def test_forcings_plot(): nwmurl_conf['start_date'] = date + hourminute nwmurl_conf['end_date'] = date + hourminute @@ -71,7 +78,7 @@ def test_forcings_plot(): generate_nwmfiles(nwmurl_conf) prep_ngen_data(conf) - nwm_dir = os.path.join(data_dir,'nwm_forcings') + nwm_dir = os.path.join(DATA_DIR,'nwm_forcings') if not os.path.exists(nwm_dir): os.system(f"mkdir {nwm_dir}") @@ -90,7 +97,7 @@ def test_forcings_plot(): nwm_data = get_nwm_data_array(nwm_dir,geopackage) - forcings_nc = os.path.join(data_dir,"forcings/1_forcings.nc") + forcings_nc = os.path.join(DATA_DIR,"forcings/1_forcings.nc") ngen_data, t_ax, catchment_ids = nc_to_3darray(forcings_nc) plot_ngen_forcings( @@ -100,11 +107,12 @@ def test_forcings_plot(): t_ax, catchment_ids, ["TMP_2maboveground"], - os.path.join(data_dir,'metadata/GIFs') + os.path.join(DATA_DIR,'metadata/GIFs') ) - os.system(f'rm {forcings_nc}') - ngen_data, t_ax, catchment_ids = csvs_to_3darray(os.path.join(data_dir,'forcings')) + os.remove(forcings_nc) + os.system(f'rm -rf {str(DATA_DIR)}forcings/*.parquet') + ngen_data, t_ax, catchment_ids = csvs_to_3darray(os.path.join(DATA_DIR,'forcings')) plot_ngen_forcings( nwm_data, ngen_data, @@ -112,5 +120,5 @@ def test_forcings_plot(): t_ax, catchment_ids, ["TMP_2maboveground"], - os.path.join(data_dir,'metadata/GIFs') + os.path.join(DATA_DIR,'metadata/GIFs') ) \ No newline at end of file