Skip to content

Commit

Permalink
adding s3 paths to config (#31)
Browse files Browse the repository at this point in the history
* adding s3 paths to config

* added a new data type for creating graphs
  • Loading branch information
taddyb authored Dec 18, 2024
1 parent fa1e6e1 commit 8bdc25d
Show file tree
Hide file tree
Showing 17 changed files with 3,211 additions and 109 deletions.
116 changes: 23 additions & 93 deletions marquette/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,35 @@ def main(cfg: DictConfig) -> None:
"""
if cfg.name.lower() == "hydrofabric":
raise ImportError("Hydrofabric functionality not yet supported")
elif cfg.name.lower() == "merit_s3":
from marquette.merit_s3.create import (create_edges, create_N, create_TMs, write_streamflow)

start = time.perf_counter()
log.info(f"Creating MERIT S3 {cfg.zone} River Graph")
edges = create_edges(cfg)

log.info(f"Creating MERIT S3 {cfg.zone} Connectivity Matrix (N) for gages")
create_N(cfg, edges)

log.info(f"Mapping {cfg.zone} Streamflow to S3 TMs")
create_TMs(cfg, edges)

log.info("Converting Streamflow to S3 DataTree")
write_streamflow(cfg, edges)

end = time.perf_counter()
log.info(f"Extracting data took : {(end - start):.6f} seconds")

elif cfg.name.lower() == "merit":
from marquette.merit.create import (create_edges, create_N, create_TMs,
map_lake_points, write_streamflow)
map_lake_points, write_streamflow, run_extensions)

start = time.perf_counter()
log.info(f"Creating MERIT {cfg.zone} River Graph")
edges = create_edges(cfg)

# log.info(f"Creating MERIT {cfg.zone} Connectivity Matrix (N) for gages")
# create_N(cfg, edges)
log.info(f"Creating MERIT {cfg.zone} Connectivity Matrix (N) for gages")
create_N(cfg, edges)

log.info(f"Mapping {cfg.zone} Streamflow to TMs")
create_TMs(cfg, edges)
Expand All @@ -52,94 +71,5 @@ def main(cfg: DictConfig) -> None:
log.error(f"incorrect name specified: {cfg.name}")


def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None:
"""
The function for running post-processing data extensions
:param cfg: Configuration object.
:type cfg: DictConfig
:return: None
"""
if "soils_data" in cfg.extensions:
from marquette.merit.extensions import soils_data

log.info("Adding soils information to your MERIT River Graph")
if "ksat" in edges:
log.info("soils information already exists in zarr format")
else:
soils_data(cfg, edges)
if "pet_forcing" in cfg.extensions:
from marquette.merit.extensions import pet_forcing

log.info("Adding PET forcing to your MERIT River Graph")
if "pet" in edges:
log.info("PET forcing already exists in zarr format")
else:
pet_forcing(cfg, edges)
if "temp_mean" in cfg.extensions:
from marquette.merit.extensions import temp_forcing

log.info("Adding temp_mean forcing to your MERIT River Graph")
if "temp_mean" in edges:
log.info("Temp_mean forcing already exists in zarr format")
else:
temp_forcing(cfg, edges)
if "global_dhbv_static_inputs" in cfg.extensions:
from marquette.merit.extensions import global_dhbv_static_inputs

log.info("Adding global dHBV static input data to your MERIT River Graph")
if "aridity" in edges:
log.info("global_dhbv_static_inputs already exists in zarr format")
else:
global_dhbv_static_inputs(cfg, edges)

if "incremental_drainage_area" in cfg.extensions:
from marquette.merit.extensions import \
calculate_incremental_drainage_area

log.info("Adding edge/catchment area input data to your MERIT River Graph")
if "incremental_drainage_area" in edges:
log.info("incremental_drainage_area already exists in zarr format")
else:
calculate_incremental_drainage_area(cfg, edges)

if "q_prime_sum" in cfg.extensions:
from marquette.merit.extensions import calculate_q_prime_summation

log.info("Adding q_prime_sum to your MERIT River Graph")
if "summed_q_prime" in edges:
log.info("q_prime_sum already exists in zarr format")
else:
calculate_q_prime_summation(cfg, edges)


if "upstream_basin_avg_mean_p" in cfg.extensions:
from marquette.merit.extensions import calculate_mean_p_summation

log.info("Adding q_prime_sum to your MERIT River Graph")
if "upstream_basin_avg_mean_p" in edges:
log.info("upstream_basin_avg_mean_p already exists in zarr format")
else:
calculate_mean_p_summation(cfg, edges)

# if "q_prime_sum_stats" in cfg.extensions:
# from marquette.merit.extensions import calculate_q_prime_sum_stats

# log.info("Adding q_prime_sum statistics to your MERIT River Graph")
# if "summed_q_prime_median" in edges:
# log.info("q_prime_sum statistics already exists in zarr format")
# else:
# calculate_q_prime_sum_stats(cfg, edges)

if "lstm_stats" in cfg.extensions:
from marquette.merit.extensions import format_lstm_forcings

log.info("Adding lstm statistics from global LSTM to your MERIT River Graph")
if "precip_comid" in edges:
log.info("q_prime_sum statistics already exists in zarr format")
else:
format_lstm_forcings(cfg, edges)


if __name__ == "__main__":
main()
main() # type: ignore
33 changes: 17 additions & 16 deletions marquette/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -1,37 +1,38 @@
name: MERIT
data_path: /projects/mhpi/data/${name}
zone: 71
name: merit_s3
s3_path: s3://mhpi-spatial/marquette/${name}/
data_path: /projects/mhpi/data/MERIT
zone: 73
gpu: 6
create_edges:
buffer: 0.3334
dx: 2000
edges: ${data_path}/zarr/graph/CONUS/edges/
flowlines: ${data_path}/raw/flowlines
edges: ${s3_path}/edges/
flowlines: ${data_path}/raw/flowlines/riv_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_N:
run_whole_zone: False
drainage_area_treshold: 0.1
filter_based_on_dataset: True
gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gnn_dataset_v1_2.shp
gage_coo_indices: ${data_path}/zarr/gage_coo_indices
pad_gage_id: False
obs_dataset: ${data_path}/gage_information/obs_csvs/GRDC_point_data.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/gnn_formatted_basins.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/subzones.csv
gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gage_9322_intersection.shp
gage_coo_indices: ${s3_path}/gages/coo_pair_intersections
pad_gage_id: True
obs_dataset: ${data_path}/gage_information/obs_csvs/all_gages_info.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/all_gages_info_combined.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/${zone}_all.csv
create_TMs:
MERIT:
save_sparse: True
TM: ${data_path}/zarr/TMs/sparse_MERIT_FLOWLINES_${zone}
TM: ${s3_path}/TMs/sparse_MERIT_FLOWLINES_${zone}
shp_files: ${data_path}/raw/basins/cat_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_streamflow:
version: merit_conus_v6.18_snow
data_store: ${data_path}/streamflow/zarr/${create_streamflow.version}/${zone}
data_store: ${s3_path}/streamflow/${create_streamflow.version}/${zone}
obs_attributes: ${data_path}/gage_information/MERIT_basin_area_info
predictions: /projects/mhpi/yxs275/DM_output/water_loss_model/dPL_local_daymet_new_attr_RMSEloss_with_log_2800
start_date: 01-01-1980
end_date: 12-31-2020
map_lake_points:
lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp
zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr
# map_lake_points:
# lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp
# zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr
extensions:
- soils_data
- pet_forcing
Expand Down
63 changes: 63 additions & 0 deletions marquette/conf/saved_configs/v1.0config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: MERIT
data_path: /projects/mhpi/data/${name}
zone: 74
gpu: 6
create_edges:
buffer: 0.3334
dx: 2000
edges: ${data_path}/zarr/graph/CONUS/edges/
flowlines: ${data_path}/raw/flowlines
create_N:
run_whole_zone: False
drainage_area_treshold: 0.1
filter_based_on_dataset: True
gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gnn_dataset_v1_2.shp
gage_coo_indices: ${data_path}/zarr/gage_coo_indices
pad_gage_id: False
obs_dataset: ${data_path}/gage_information/obs_csvs/GRDC_point_data.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/gnn_formatted_basins.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/subzones.csv
create_TMs:
MERIT:
save_sparse: True
TM: ${data_path}/zarr/TMs/sparse_MERIT_FLOWLINES_${zone}
shp_files: ${data_path}/raw/basins/cat_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_streamflow:
version: merit_conus_v6.18_snow
data_store: ${data_path}/streamflow/zarr/${create_streamflow.version}/${zone}
obs_attributes: ${data_path}/gage_information/MERIT_basin_area_info
predictions: /projects/mhpi/yxs275/DM_output/water_loss_model/dPL_local_daymet_new_attr_RMSEloss_with_log_2800
start_date: 01-01-1980
end_date: 12-31-2020
map_lake_points:
lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp
zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr
extensions:
- soils_data
- pet_forcing
- global_dhbv_static_inputs
- incremental_drainage_area
- q_prime_sum
- upstream_basin_avg_mean_p
- q_prime_sum_stats
- lstm_stats
- temp_mean
# Hydra Config ------------------------------------------------------------------------#
hydra:
help:
app_name: marquette
header: == ${hydra.help.app_name} ==
template: |-
${hydra.help.header}
A data pipeline tool used to generate inputs to dMC river routing
By Tadd Bindas

${hydra.help.footer}
footer: |-
Powered by Hydra (https://hydra.cc)
Use --hydra-help to view Hydra specific help
job:
name: ${name}
run:
dir: ../runs/${hydra.job.name}_${zone}/${now:%Y-%m-%d_%H-%M-%S}
89 changes: 89 additions & 0 deletions marquette/merit/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,92 @@ def map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None:
else:
log.info("Mapping HydroLakes Pour Points to Edges")
_map_lake_points(cfg, edges)


def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None:
"""
The function for running post-processing data extensions
:param cfg: Configuration object.
:type cfg: DictConfig
:return: None
"""
if "soils_data" in cfg.extensions:
from marquette.merit.extensions import soils_data

log.info("Adding soils information to your MERIT River Graph")
if "ksat" in edges:
log.info("soils information already exists in zarr format")
else:
soils_data(cfg, edges)
if "pet_forcing" in cfg.extensions:
from marquette.merit.extensions import pet_forcing

log.info("Adding PET forcing to your MERIT River Graph")
if "pet" in edges:
log.info("PET forcing already exists in zarr format")
else:
pet_forcing(cfg, edges)
if "temp_mean" in cfg.extensions:
from marquette.merit.extensions import temp_forcing

log.info("Adding temp_mean forcing to your MERIT River Graph")
if "temp_mean" in edges:
log.info("Temp_mean forcing already exists in zarr format")
else:
temp_forcing(cfg, edges)
if "global_dhbv_static_inputs" in cfg.extensions:
from marquette.merit.extensions import global_dhbv_static_inputs

log.info("Adding global dHBV static input data to your MERIT River Graph")
if "aridity" in edges:
log.info("global_dhbv_static_inputs already exists in zarr format")
else:
global_dhbv_static_inputs(cfg, edges)

if "incremental_drainage_area" in cfg.extensions:
from marquette.merit.extensions import \
calculate_incremental_drainage_area

log.info("Adding edge/catchment area input data to your MERIT River Graph")
if "incremental_drainage_area" in edges:
log.info("incremental_drainage_area already exists in zarr format")
else:
calculate_incremental_drainage_area(cfg, edges)

if "q_prime_sum" in cfg.extensions:
from marquette.merit.extensions import calculate_q_prime_summation

log.info("Adding q_prime_sum to your MERIT River Graph")
if "summed_q_prime" in edges:
log.info("q_prime_sum already exists in zarr format")
else:
calculate_q_prime_summation(cfg, edges)


if "upstream_basin_avg_mean_p" in cfg.extensions:
from marquette.merit.extensions import calculate_mean_p_summation

log.info("Adding q_prime_sum to your MERIT River Graph")
if "upstream_basin_avg_mean_p" in edges:
log.info("upstream_basin_avg_mean_p already exists in zarr format")
else:
calculate_mean_p_summation(cfg, edges)

# if "q_prime_sum_stats" in cfg.extensions:
# from marquette.merit.extensions import calculate_q_prime_sum_stats

# log.info("Adding q_prime_sum statistics to your MERIT River Graph")
# if "summed_q_prime_median" in edges:
# log.info("q_prime_sum statistics already exists in zarr format")
# else:
# calculate_q_prime_sum_stats(cfg, edges)

if "lstm_stats" in cfg.extensions:
from marquette.merit.extensions import format_lstm_forcings

log.info("Adding lstm statistics from global LSTM to your MERIT River Graph")
if "precip_comid" in edges:
log.info("q_prime_sum statistics already exists in zarr format")
else:
format_lstm_forcings(cfg, edges)
Loading

0 comments on commit 8bdc25d

Please sign in to comment.