Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reservoir indices #25

Merged
merged 2 commits into from
Jul 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions marquette/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import zarr
from omegaconf import DictConfig

log = logging.getLogger(__name__)
log = logging.getLogger(name=__name__)


@hydra.main(
Expand All @@ -24,12 +24,8 @@ def main(cfg: DictConfig) -> None:
if cfg.name.lower() == "hydrofabric":
raise ImportError("Hydrofabric functionality not yet supported")
elif cfg.name.lower() == "merit":
from marquette.merit.create import (
create_edges,
create_N,
create_TMs,
write_streamflow,
)
from marquette.merit.create import (create_edges, create_N, create_TMs,
map_lake_points, write_streamflow)

start = time.perf_counter()
log.info(f"Creating MERIT {cfg.zone} River Graph")
Expand All @@ -44,6 +40,9 @@ def main(cfg: DictConfig) -> None:
log.info("Converting Streamflow to zarr")
write_streamflow(cfg, edges)

log.info("Mapping Lake Pour Points to Edges")
map_lake_points(cfg, edges)

log.info("Running Data Post-Processing Extensions")
run_extensions(cfg, edges)

Expand Down Expand Up @@ -87,7 +86,8 @@ def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None:
global_dhbv_static_inputs(cfg, edges)

if "incremental_drainage_area" in cfg.extensions:
from marquette.merit.extensions import calculate_incremental_drainage_area
from marquette.merit.extensions import \
calculate_incremental_drainage_area

log.info("Adding edge/catchment area input data to your MERIT River Graph")
if "incremental_drainage_area" in edges:
Expand Down
3 changes: 3 additions & 0 deletions marquette/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ create_streamflow:
predictions: /projects/mhpi/yxs275/DM_output/dPL_local_daymet_new_attr_water_loss_v6v10_random_batch_all_merit_forward/
start_date: 01-01-1980
end_date: 12-31-2020
map_lake_points:
lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp
zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr
extensions:
- soils_data
- pet_forcing
Expand Down
62 changes: 62 additions & 0 deletions marquette/merit/_map_lake_points.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging
from pathlib import Path

import geopandas as gpd
import numpy as np
import zarr
from omegaconf import DictConfig
from tqdm import tqdm

log = logging.getLogger(name=__name__)

def _map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None:
"""A function that reads in a gdf of hydrolakes information, finds its corresponding edge, then saves the data

Parameters
----------
cfg: DictConfig
The configuration object
edges: zarr.Group
The zarr group containing the edges
"""
data_path = Path(cfg.map_lake_points.lake_points)
if not data_path.exists():
msg = "Cannot find the lake points file"
log.exception(msg)
raise FileNotFoundError(msg)
gdf = gpd.read_file(data_path)
lake_comids = gdf["COMID"].astype(int).values
edges_comids : np.ndarray = edges["merit_basin"][:].astype(np.int32) # type: ignore

hylak_id = np.full(len(edges_comids), -1, dtype=np.int32)
grand_id = np.full_like(edges_comids, -1, dtype=np.int32)
lake_area = np.full_like(edges_comids, -1, dtype=np.float32)
vol_total = np.full_like(edges_comids, -1, dtype=np.float32)
depth_avg = np.full_like(edges_comids, -1, dtype=np.float32)

for idx, lake_id in enumerate(tqdm(
lake_comids,
desc="Mapping Lake COMIDS to edges",
ncols=140,
ascii=True,
)) :
jdx = np.where(edges_comids == lake_id)[0]
if not jdx.size:
log.info(f"No lake found for COMID {lake_id}")
else:
# Assumung the pour point is at the end of the COMID
edge_idx = jdx[-1]
lake_row = gdf.iloc[idx]
hylak_id[edge_idx] = lake_row["Hylak_id"]
grand_id[edge_idx] = lake_row["Grand_id"]
lake_area[edge_idx] = lake_row["Lake_area"]
vol_total[edge_idx] = lake_row["Vol_total"]
depth_avg[edge_idx] = lake_row["Depth_avg"]

edges.array("hylak_id", data=hylak_id)
edges.array("grand_id", data=grand_id)
edges.array("lake_area", data=lake_area)
edges.array("vol_total", data=vol_total)
edges.array("depth_avg", data=depth_avg)

log.info("Wrote Lake data for zones to zarr")
50 changes: 28 additions & 22 deletions marquette/merit/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,19 @@
from omegaconf import DictConfig
from tqdm import tqdm

from marquette.merit._connectivity_matrix import (
create_gage_connectivity,
map_gages_to_zone,
new_zone_connectivity,
)
from marquette.merit._connectivity_matrix import (create_gage_connectivity,
map_gages_to_zone,
new_zone_connectivity)
from marquette.merit._edge_calculations import (
calculate_num_edges,
create_segment,
find_flowlines,
many_segment_to_edge_partition,
singular_segment_to_edge_partition,
sort_xarray_dataarray,
)
calculate_num_edges, create_segment, find_flowlines,
many_segment_to_edge_partition, singular_segment_to_edge_partition,
sort_xarray_dataarray)
from marquette.merit._map_lake_points import _map_lake_points
from marquette.merit._streamflow_conversion_functions import (
calculate_huc10_flow_from_individual_files,
calculate_merit_flow,
separate_basins,
)
from marquette.merit._TM_calculations import (
create_HUC_MERIT_TM,
create_MERIT_FLOW_TM,
# create_sparse_MERIT_FLOW_TM,
join_geospatial_data,
)
calculate_huc10_flow_from_individual_files, calculate_merit_flow,
separate_basins)
from marquette.merit._TM_calculations import ( # create_sparse_MERIT_FLOW_TM,
create_HUC_MERIT_TM, create_MERIT_FLOW_TM, join_geospatial_data)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -242,3 +231,20 @@ def create_TMs(cfg: DictConfig, edges: zarr.Group) -> None:
# create_sparse_MERIT_FLOW_TM(cfg, edges)
# else:
create_MERIT_FLOW_TM(cfg, edges)


def map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None:
"""Maps HydroLAKES pour points to the corresponding edge

Parameters
----------
cfg: DictConfig
The configuration object
edges: zarr.Group
The zarr group containing the edges
"""
if "hylak_id" in edges:
log.info("HydroLakes Intersection already exists in Zarr format")
else:
log.info("Mapping HydroLakes Pour Points to Edges")
_map_lake_points(cfg, edges)
66 changes: 38 additions & 28 deletions marquette/merit/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,19 @@ def calculate_incremental_drainage_area(cfg: DictConfig, edges: zarr.Group) -> N
)


def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None:
def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None:
"""Creates Q` summed data for all edges in a given MERIT zone

Parameters:
----------
cfg: DictConfig
The configuration object.
edges: zarr.Group
The edges group in the MERIT zone
"""
n = 1 # number of splits (used for reducing memory load)
n = 1 # number of splits (used for reducing memory load)
cp.cuda.runtime.setDevice(2) # manually setting the device to 2

streamflow_group = Path(
f"/projects/mhpi/data/MERIT/streamflow/zarr/{cfg.create_streamflow.version}/{cfg.zone}"
)
Expand All @@ -245,16 +245,15 @@ def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None:
streamflow_zarr: zarr.Group = zarr.open_group(streamflow_group, mode="r")
streamflow_time = streamflow_zarr.time[:]
# _, counts = cp.unique(edges.segment_sorting_index[:], return_counts=True) # type: ignore
dim_0 : int = streamflow_zarr.time.shape[0] # type: ignore
dim_1 : int = streamflow_zarr.COMID.shape[0] # type: ignore
dim_0: int = streamflow_zarr.time.shape[0] # type: ignore
dim_1: int = streamflow_zarr.COMID.shape[0] # type: ignore
edge_ids = np.array(edges.id[:])
edges_segment_sorting_index = cp.array(edges.segment_sorting_index[:])
edge_index_mapping = {v: i for i, v in enumerate(edge_ids)}

q_prime_np = np.zeros([dim_0, dim_1]).transpose(1, 0)
streamflow_data = cp.array(streamflow_zarr.streamflow[:])


# Generating a networkx DiGraph object
df_path = Path(f"{cfg.create_edges.edges}").parent / f"{cfg.zone}_graph_df.csv"
if df_path.exists():
Expand All @@ -264,47 +263,58 @@ def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None:
target = []
for idx, _ in enumerate(
tqdm(
edges.id[:],
desc="creating graph in dictionary form",
ascii=True,
ncols=140,
)
):
edges.id[:],
desc="creating graph in dictionary form",
ascii=True,
ncols=140,
)
):
if edges.ds[idx] != "0_0":
source.append(idx)
target.append(edge_index_mapping[edges.ds[idx]])
df = pd.DataFrame({"source": source, "target": target})
df.to_csv(df_path, index=False)
G = nx.from_pandas_edgelist(df=df, create_using=nx.DiGraph(),)

time_split = np.array_split(streamflow_time, n) #type: ignore
G = nx.from_pandas_edgelist(
df=df,
create_using=nx.DiGraph(),
)

time_split = np.array_split(streamflow_time, n) # type: ignore
for idx, time_range in enumerate(time_split):
q_prime_cp = cp.zeros([time_range.shape[0], dim_1]).transpose(1, 0)

for jdx, _ in enumerate(tqdm(
edge_ids,
desc=f"calculating q` sum data for part {idx}",
ascii=True,
ncols=140,
)):
for jdx, _ in enumerate(
tqdm(
edge_ids,
desc=f"calculating q` sum data for part {idx}",
ascii=True,
ncols=140,
)
):
streamflow_ds_id = edges_segment_sorting_index[jdx]
# num_edges_in_comid = counts[streamflow_ds_id]
try:
graph = nx.descendants(G, jdx, backend="cugraph")
graph.add(jdx) # Adding the idx to ensure it's counted
downstream_idx = np.array(list(graph)) # type: ignore
downstream_comid_idx = cp.unique(edges_segment_sorting_index[downstream_idx]) # type: ignore
q_prime_cp[downstream_comid_idx] += streamflow_data[time_range, streamflow_ds_id] # type: ignore
downstream_comid_idx = cp.unique(
edges_segment_sorting_index[downstream_idx]
) # type: ignore
q_prime_cp[downstream_comid_idx] += streamflow_data[
time_range, streamflow_ds_id
] # type: ignore
except nx.exception.NetworkXError:
# This means there is no connectivity from this basin. It's one-node graph
q_prime_cp[streamflow_ds_id] = streamflow_data[time_range, streamflow_ds_id]

q_prime_cp[streamflow_ds_id] = streamflow_data[
time_range, streamflow_ds_id
]

print("Saving GPU Memory to CPU; freeing GPU Memory")
q_prime_np[:, time_range] = cp.asnumpy(q_prime_cp)
del q_prime_cp
cp.get_default_memory_pool().free_all_blocks()

edges.array(
name="summed_q_prime",
data=q_prime_np.transpose(1, 0),
)
)
8 changes: 2 additions & 6 deletions marquette/merit/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@
from scipy.sparse import csr_matrix
from tqdm import tqdm

from marquette.merit._graph import (
Segment,
data_to_csv,
get_edge_counts,
segments_to_edges,
)
from marquette.merit._graph import (Segment, data_to_csv, get_edge_counts,
segments_to_edges)

log = logging.getLogger(__name__)

Expand Down
2 changes: 2 additions & 0 deletions requirements-cuda.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
cudf-cu12==24.6.*
dask-cudf-cu12==24.6.*
cugraph-cu12==24.6.*
cuspatial-cu12==24.6.*
cuproj-cu12==24.6.*
59 changes: 59 additions & 0 deletions scripts/hydrolakes_to_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import argparse
from pathlib import Path

import geopandas as gpd
import numcodecs
import zarr
from shapely.wkt import dumps
from tqdm import tqdm


def split_hydrolakes(input_path: Path, output_path: Path) -> None:

print(f"Reading gdf file: {input_path}")
gdf = gpd.read_file(filename=input_path)

print("Writing geometries")
geometries = gdf["geometry"].apply(lambda geom: dumps(geom)).values

# Create a Zarr store
root: zarr.Group = zarr.open_group(output_path, mode="a")

# Create datasets for each column
for column in tqdm(
gdf.columns,
desc="writing gdf to zarr",
ncols=140,
ascii=True,
):
if column == "geometry":
root.array(column, data=geometries, dtype=object, object_codec=numcodecs.VLenUTF8())
root.attrs["crs"] = gdf.crs.to_string()
else:
data = gdf[column].values
if data.dtype == 'O':
# Saving as an object
root.array(column, data=geometries, dtype=object, object_codec=numcodecs.VLenUTF8())
else:
root.array(column, data=data)

print(f"Processing complete! Zarr store saved to: {output_path}")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Convert a shapefile to a Zarr store")
parser.add_argument("input_shp", type=Path, help="Path to the input shapefile")
parser.add_argument(
"output_zarr", type=Path, help="Path to save the output Zarr store"
)

args = parser.parse_args()

input_path = Path(args.input_shp)
output_path = Path(args.output_zarr)
if not input_path.exists():
raise FileNotFoundError(f"Input shapefile not found: {input_path}")
if output_path.exists():
raise FileExistsError(f"Output Zarr store already exists: {output_path}")

split_hydrolakes(input_path, output_path)
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import numpy as np
import pytest


@pytest.fixture
def sample_gage_cfg():
with hydra.initialize(
Expand Down
Loading