Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for merit_conus_v3.0 #21

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion marquette/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def run_extensions(cfg, edges):
log.info("global_dhbv_static_inputs already exists in zarr format")
else:
global_dhbv_static_inputs(cfg, edges)

if "incremental_drainage_area" in cfg.extensions:
from marquette.merit.extensions import calculate_incremental_drainage_area

Expand Down
12 changes: 6 additions & 6 deletions marquette/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: MERIT
data_path: /projects/mhpi/data/${name}
zone: 75
zone: 73
create_edges:
buffer: 0.3334
dx: 2000
Expand All @@ -13,19 +13,19 @@ create_N:
gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gage_9322_intersection.shp
gage_coo_indices: ${data_path}/zarr/gage_coo_indices
pad_gage_id: True
obs_dataset: ${data_path}/gage_information/obs_csvs/gages3000Info.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/gages_3000_merit_info.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/${zone}.csv
obs_dataset: ${data_path}/gage_information/obs_csvs/all_gages_info.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/all_gages_info_combined.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/${zone}_all.csv
create_TMs:
MERIT:
save_sparse: True
TM: ${data_path}/zarr/TMs/MERIT_FLOWLINES_${zone}
shp_files: ${data_path}/raw/basins/cat_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_streamflow:
version: merit_conus_v1.1
version: merit_conus_v3.0
data_store: ${data_path}/streamflow/zarr/${create_streamflow.version}/${zone}
obs_attributes: ${data_path}/gage_information/MERIT_basin_area_info
predictions: /projects/mhpi/hjj5218/model_output/conus_zarr/${zone}
predictions: /projects/mhpi/yxs275/DM_output/dPL_local_daymet_new_attr_water_loss_v3_all_merit
start_date: 01-01-1980
end_date: 12-31-2020
extensions:
Expand Down
2 changes: 1 addition & 1 deletion marquette/merit/_connectivity_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def find_closest_edge(

columns = [
"STAID",
"STANAME",
# "STANAME",
# "MERIT_ZONE",
"HUC02",
"DRAIN_SQKM",
Expand Down
27 changes: 24 additions & 3 deletions marquette/merit/_streamflow_conversion_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,31 @@ def calculate_merit_flow(cfg: DictConfig, edges: zarr.hierarchy.Group) -> None:
streamflow_predictions_root = zarr.open(
Path(cfg.create_streamflow.predictions), mode="r"
)
log.info("Reading Zarr Store")
file_runoff = np.transpose(streamflow_predictions_root.Runoff)

streamflow_comids: np.ndarray = streamflow_predictions_root.COMID[:].astype(int)
# Different merit forwards have different save outputs. Specifying here to handle the different versions
version = int(
cfg.create_streamflow.version.lower().split("_v")[1][0]
) # getting the version number
if version >= 3:
log.info(msg="Reading Zarr Store")
zone_keys = [
key for key in streamflow_predictions_root.keys() if str(cfg.zone) in key
]
zone_comids = []
zone_runoff = []
for key in zone_keys:
zone_comids.append(streamflow_predictions_root[key].COMID[:])
zone_runoff.append(streamflow_predictions_root[key].Qr[:])
streamflow_comids = np.concatenate(zone_comids).astype(int)
file_runoff = np.transpose(np.concatenate(zone_runoff))
del zone_comids
del zone_runoff

else:
log.info("Reading Zarr Store")
file_runoff = np.transpose(streamflow_predictions_root.Runoff)

streamflow_comids: np.ndarray = streamflow_predictions_root.COMID[:].astype(int)

log.info("Mapping predictions to zone COMIDs")
runoff_full_zone = np.zeros((file_runoff.shape[0], edge_comids.shape[0]))
Expand Down
55 changes: 29 additions & 26 deletions marquette/merit/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,12 @@ def global_dhbv_static_inputs(cfg: DictConfig, edges: zarr.Group) -> None:
edges.array(name="mean_p", data=mean_p_arr[mapping])
edges.array(name="mean_elevation", data=mean_elevation_arr[mapping])
edges.array(name="glacier", data=glacier_arr[mapping])


def calculate_incremental_drainage_area(cfg: DictConfig, edges: zarr.Group) -> None:
"""
Runs a Polars query to calculate the incremental drainage area for each edge in the MERIT dataset
"""
"""
basin_file = (
Path(cfg.data_path)
/ f"raw/basins/cat_pfaf_{cfg.zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp"
Expand All @@ -243,31 +243,34 @@ def calculate_incremental_drainage_area(cfg: DictConfig, edges: zarr.Group) -> N
gdf = gpd.read_file(basin_file)
_df = pd.DataFrame(gdf.drop(columns="geometry"))
df = pl.from_pandas(_df)
edges_df = pl.DataFrame({"COMID": edges.merit_basin[:], "id": edges.id[:], "order": np.arange(edges.id.shape[0])})
edges_df = pl.DataFrame(
{
"COMID": edges.merit_basin[:],
"id": edges.id[:],
"order": np.arange(edges.id.shape[0]),
}
)

result = df.lazy().join(
other=edges_df.lazy(),
left_on="COMID",
right_on="COMID",
how="left"
).group_by(
by="COMID",
).agg([
pl.map_groups(
exprs=["unitarea", pl.first("unitarea")],
function=lambda list_of_series:
list_of_series[1] / list_of_series[0].shape[0]
).alias("incremental_drainage_area")
]).join(
other=edges_df.lazy(),
left_on="COMID",
right_on="COMID",
how="left"
).sort(
by="order"
).collect()
result = (
df.lazy()
.join(other=edges_df.lazy(), left_on="COMID", right_on="COMID", how="left")
.group_by(
by="COMID",
)
.agg(
[
pl.map_groups(
exprs=["unitarea", pl.first("unitarea")],
function=lambda list_of_series: list_of_series[1]
/ list_of_series[0].shape[0],
).alias("incremental_drainage_area")
]
)
.join(other=edges_df.lazy(), left_on="COMID", right_on="COMID", how="left")
.sort(by="order")
.collect()
)
edges.array(
name="incremental_drainage_area",
data=result.select(pl.col("incremental_drainage_area")).to_numpy().squeeze(),
)

Loading