From 59582ed549b8921447273e0b64be66b0f7c680d2 Mon Sep 17 00:00:00 2001 From: Tadd Bindas Date: Sat, 6 Jul 2024 10:32:42 -0500 Subject: [PATCH 1/2] formatted files, created a script to move the hydrolakes shp file into zarr --- marquette/__main__.py | 16 +++---- marquette/conf/config.yaml | 4 ++ marquette/merit/_map_lake_points.py | 36 ++++++++++++++++ marquette/merit/create.py | 52 +++++++++++++---------- marquette/merit/extensions.py | 66 +++++++++++++++++------------ marquette/merit/map.py | 8 +--- scripts/hydrolakes_to_zarr.py | 59 ++++++++++++++++++++++++++ tests/conftest.py | 1 + 8 files changed, 178 insertions(+), 64 deletions(-) create mode 100644 marquette/merit/_map_lake_points.py create mode 100644 scripts/hydrolakes_to_zarr.py diff --git a/marquette/__main__.py b/marquette/__main__.py index 58b6b78..8dc4564 100644 --- a/marquette/__main__.py +++ b/marquette/__main__.py @@ -5,7 +5,7 @@ import zarr from omegaconf import DictConfig -log = logging.getLogger(__name__) +log = logging.getLogger(name=__name__) @hydra.main( @@ -24,12 +24,8 @@ def main(cfg: DictConfig) -> None: if cfg.name.lower() == "hydrofabric": raise ImportError("Hydrofabric functionality not yet supported") elif cfg.name.lower() == "merit": - from marquette.merit.create import ( - create_edges, - create_N, - create_TMs, - write_streamflow, - ) + from marquette.merit.create import (create_edges, create_N, create_TMs, + map_lake_points, write_streamflow) start = time.perf_counter() log.info(f"Creating MERIT {cfg.zone} River Graph") @@ -44,6 +40,9 @@ def main(cfg: DictConfig) -> None: log.info("Converting Streamflow to zarr") write_streamflow(cfg, edges) + log.info("Mapping Lake Pour Points to Edges") + map_lake_points(cfg, edges) + log.info("Running Data Post-Processing Extensions") run_extensions(cfg, edges) @@ -87,7 +86,8 @@ def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None: global_dhbv_static_inputs(cfg, edges) if "incremental_drainage_area" in cfg.extensions: - from marquette.merit.extensions import calculate_incremental_drainage_area + from marquette.merit.extensions import \ + calculate_incremental_drainage_area log.info("Adding edge/catchment area input data to your MERIT River Graph") if "incremental_drainage_area" in edges: diff --git a/marquette/conf/config.yaml b/marquette/conf/config.yaml index b85560f..f85964d 100644 --- a/marquette/conf/config.yaml +++ b/marquette/conf/config.yaml @@ -28,6 +28,10 @@ create_streamflow: predictions: /projects/mhpi/yxs275/DM_output/dPL_local_daymet_new_attr_water_loss_v6v10_random_batch_all_merit_forward/ start_date: 01-01-1980 end_date: 12-31-2020 +map_lake_points: + buffer: 250 + hydrolakes: /projects/mhpi/data/hydroLakes/conus_hydrolakes.gpkg + edge_lakes: ${data_path}/zarr/lakes/edges/ extensions: - soils_data - pet_forcing diff --git a/marquette/merit/_map_lake_points.py b/marquette/merit/_map_lake_points.py new file mode 100644 index 0000000..fb9044c --- /dev/null +++ b/marquette/merit/_map_lake_points.py @@ -0,0 +1,36 @@ +import logging +from pathlib import Path + +import cuspatial +import geopandas as gpd +import pandas as pd +import zarr +from omegaconf import DictConfig +from shapely.geometry import LineString +from shapely.wkt import loads + +log = logging.getLogger(name=__name__) + + +def _map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None: + """A function that reads in a gdf of hydrolakes information, then intersects that point with edge data + + Parameters + ---------- + cfg: DictConfig + The configuration object + edges: zarr.Group + The zarr group containing the edges + """ + log.info("Reading in HydroLAKES data") + data_path = Path(cfg.map_lake_points.hydrolakes) + if not data_path.exists(): + msg = f"{data_path} does not exist" + log.exception(msg) + raise FileNotFoundError(msg) + gdf = gpd.read_file(data_path) + reserviors_geoseries = cuspatial.GeoSeries(gdf["geometry"]) + edge_coords = [loads(coords) for coords in edges.coords] + edge_geoseries = cuspatial.GeoSeries(gpd.GeoSeries(edge_coords)) + log.info("Intersecting HydroLAKES data with edge data") + for reservoir in reserviors_geoseries diff --git a/marquette/merit/create.py b/marquette/merit/create.py index c6da455..37185c5 100644 --- a/marquette/merit/create.py +++ b/marquette/merit/create.py @@ -10,30 +10,19 @@ from omegaconf import DictConfig from tqdm import tqdm -from marquette.merit._connectivity_matrix import ( - create_gage_connectivity, - map_gages_to_zone, - new_zone_connectivity, -) +from marquette.merit._connectivity_matrix import (create_gage_connectivity, + map_gages_to_zone, + new_zone_connectivity) from marquette.merit._edge_calculations import ( - calculate_num_edges, - create_segment, - find_flowlines, - many_segment_to_edge_partition, - singular_segment_to_edge_partition, - sort_xarray_dataarray, -) + calculate_num_edges, create_segment, find_flowlines, + many_segment_to_edge_partition, singular_segment_to_edge_partition, + sort_xarray_dataarray) +from marquette.merit._map_lake_points import _map_lake_points from marquette.merit._streamflow_conversion_functions import ( - calculate_huc10_flow_from_individual_files, - calculate_merit_flow, - separate_basins, -) -from marquette.merit._TM_calculations import ( - create_HUC_MERIT_TM, - create_MERIT_FLOW_TM, - # create_sparse_MERIT_FLOW_TM, - join_geospatial_data, -) + calculate_huc10_flow_from_individual_files, calculate_merit_flow, + separate_basins) +from marquette.merit._TM_calculations import ( # create_sparse_MERIT_FLOW_TM, + create_HUC_MERIT_TM, create_MERIT_FLOW_TM, join_geospatial_data) log = logging.getLogger(__name__) @@ -242,3 +231,22 @@ def create_TMs(cfg: DictConfig, edges: zarr.Group) -> None: # create_sparse_MERIT_FLOW_TM(cfg, edges) # else: create_MERIT_FLOW_TM(cfg, edges) + + +def map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None: + """Maps HydroLAKES pour points to the corresponding edge + + Parameters + ---------- + cfg: DictConfig + The configuration object + edges: zarr.Group + The zarr group containing the edges + """ + reservoirs = zarr.open_group(Path(cfg.map_lake_points.edge_lakes), mode="a") + zone_root = reservoirs.require_group(cfg.zone) + if "Hylak_id" in zone_root: + log.info("HydroLakes Intersection already exists in Zarr format") + else: + log.info("Mapping HydroLakes Pour Points to Edges") + _map_lake_points(cfg, edges) diff --git a/marquette/merit/extensions.py b/marquette/merit/extensions.py index 9e54704..c35dd4b 100644 --- a/marquette/merit/extensions.py +++ b/marquette/merit/extensions.py @@ -224,9 +224,9 @@ def calculate_incremental_drainage_area(cfg: DictConfig, edges: zarr.Group) -> N ) -def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None: +def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None: """Creates Q` summed data for all edges in a given MERIT zone - + Parameters: ---------- cfg: DictConfig @@ -234,9 +234,9 @@ def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None: edges: zarr.Group The edges group in the MERIT zone """ - n = 1 # number of splits (used for reducing memory load) + n = 1 # number of splits (used for reducing memory load) cp.cuda.runtime.setDevice(2) # manually setting the device to 2 - + streamflow_group = Path( f"/projects/mhpi/data/MERIT/streamflow/zarr/{cfg.create_streamflow.version}/{cfg.zone}" ) @@ -245,8 +245,8 @@ def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None: streamflow_zarr: zarr.Group = zarr.open_group(streamflow_group, mode="r") streamflow_time = streamflow_zarr.time[:] # _, counts = cp.unique(edges.segment_sorting_index[:], return_counts=True) # type: ignore - dim_0 : int = streamflow_zarr.time.shape[0] # type: ignore - dim_1 : int = streamflow_zarr.COMID.shape[0] # type: ignore + dim_0: int = streamflow_zarr.time.shape[0] # type: ignore + dim_1: int = streamflow_zarr.COMID.shape[0] # type: ignore edge_ids = np.array(edges.id[:]) edges_segment_sorting_index = cp.array(edges.segment_sorting_index[:]) edge_index_mapping = {v: i for i, v in enumerate(edge_ids)} @@ -254,7 +254,6 @@ def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None: q_prime_np = np.zeros([dim_0, dim_1]).transpose(1, 0) streamflow_data = cp.array(streamflow_zarr.streamflow[:]) - # Generating a networkx DiGraph object df_path = Path(f"{cfg.create_edges.edges}").parent / f"{cfg.zone}_graph_df.csv" if df_path.exists(): @@ -264,47 +263,58 @@ def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None: target = [] for idx, _ in enumerate( tqdm( - edges.id[:], - desc="creating graph in dictionary form", - ascii=True, - ncols=140, - ) - ): + edges.id[:], + desc="creating graph in dictionary form", + ascii=True, + ncols=140, + ) + ): if edges.ds[idx] != "0_0": source.append(idx) target.append(edge_index_mapping[edges.ds[idx]]) df = pd.DataFrame({"source": source, "target": target}) df.to_csv(df_path, index=False) - G = nx.from_pandas_edgelist(df=df, create_using=nx.DiGraph(),) - - time_split = np.array_split(streamflow_time, n) #type: ignore + G = nx.from_pandas_edgelist( + df=df, + create_using=nx.DiGraph(), + ) + + time_split = np.array_split(streamflow_time, n) # type: ignore for idx, time_range in enumerate(time_split): q_prime_cp = cp.zeros([time_range.shape[0], dim_1]).transpose(1, 0) - for jdx, _ in enumerate(tqdm( - edge_ids, - desc=f"calculating q` sum data for part {idx}", - ascii=True, - ncols=140, - )): + for jdx, _ in enumerate( + tqdm( + edge_ids, + desc=f"calculating q` sum data for part {idx}", + ascii=True, + ncols=140, + ) + ): streamflow_ds_id = edges_segment_sorting_index[jdx] # num_edges_in_comid = counts[streamflow_ds_id] try: graph = nx.descendants(G, jdx, backend="cugraph") graph.add(jdx) # Adding the idx to ensure it's counted downstream_idx = np.array(list(graph)) # type: ignore - downstream_comid_idx = cp.unique(edges_segment_sorting_index[downstream_idx]) # type: ignore - q_prime_cp[downstream_comid_idx] += streamflow_data[time_range, streamflow_ds_id] # type: ignore + downstream_comid_idx = cp.unique( + edges_segment_sorting_index[downstream_idx] + ) # type: ignore + q_prime_cp[downstream_comid_idx] += streamflow_data[ + time_range, streamflow_ds_id + ] # type: ignore except nx.exception.NetworkXError: # This means there is no connectivity from this basin. It's one-node graph - q_prime_cp[streamflow_ds_id] = streamflow_data[time_range, streamflow_ds_id] - + q_prime_cp[streamflow_ds_id] = streamflow_data[ + time_range, streamflow_ds_id + ] + print("Saving GPU Memory to CPU; freeing GPU Memory") q_prime_np[:, time_range] = cp.asnumpy(q_prime_cp) del q_prime_cp cp.get_default_memory_pool().free_all_blocks() - + edges.array( name="summed_q_prime", data=q_prime_np.transpose(1, 0), - ) + ) diff --git a/marquette/merit/map.py b/marquette/merit/map.py index 600570d..ac89a99 100644 --- a/marquette/merit/map.py +++ b/marquette/merit/map.py @@ -12,12 +12,8 @@ from scipy.sparse import csr_matrix from tqdm import tqdm -from marquette.merit._graph import ( - Segment, - data_to_csv, - get_edge_counts, - segments_to_edges, -) +from marquette.merit._graph import (Segment, data_to_csv, get_edge_counts, + segments_to_edges) log = logging.getLogger(__name__) diff --git a/scripts/hydrolakes_to_zarr.py b/scripts/hydrolakes_to_zarr.py new file mode 100644 index 0000000..8300162 --- /dev/null +++ b/scripts/hydrolakes_to_zarr.py @@ -0,0 +1,59 @@ +import argparse +from pathlib import Path + +import geopandas as gpd +import numcodecs +import zarr +from shapely.wkt import dumps +from tqdm import tqdm + + +def split_hydrolakes(input_path: Path, output_path: Path) -> None: + + print(f"Reading gdf file: {input_path}") + gdf = gpd.read_file(filename=input_path) + + print("Writing geometries") + geometries = gdf["geometry"].apply(lambda geom: dumps(geom)).values + + # Create a Zarr store + root: zarr.Group = zarr.open_group(output_path, mode="a") + + # Create datasets for each column + for column in tqdm( + gdf.columns, + desc="writing gdf to zarr", + ncols=140, + ascii=True, + ): + if column == "geometry": + root.array(column, data=geometries, dtype=object, object_codec=numcodecs.VLenUTF8()) + root.attrs["crs"] = gdf.crs.to_string() + else: + data = gdf[column].values + if data.dtype == 'O': + # Saving as an object + root.array(column, data=geometries, dtype=object, object_codec=numcodecs.VLenUTF8()) + else: + root.array(column, data=data) + + print(f"Processing complete! Zarr store saved to: {output_path}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Convert a shapefile to a Zarr store") + parser.add_argument("input_shp", type=Path, help="Path to the input shapefile") + parser.add_argument( + "output_zarr", type=Path, help="Path to save the output Zarr store" + ) + + args = parser.parse_args() + + input_path = Path(args.input_shp) + output_path = Path(args.output_zarr) + if not input_path.exists(): + raise FileNotFoundError(f"Input shapefile not found: {input_path}") + if output_path.exists(): + raise FileExistsError(f"Output Zarr store already exists: {output_path}") + + split_hydrolakes(input_path, output_path) diff --git a/tests/conftest.py b/tests/conftest.py index 0ed43d0..0c6cc81 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ import numpy as np import pytest + @pytest.fixture def sample_gage_cfg(): with hydra.initialize( From e55c20c5c70814bc9015690d3329ecb26f20694f Mon Sep 17 00:00:00 2001 From: Tadd Bindas Date: Sat, 6 Jul 2024 19:13:10 -0500 Subject: [PATCH 2/2] added reservoir code --- marquette/conf/config.yaml | 5 ++- marquette/merit/_map_lake_points.py | 54 +++++++++++++++++++++-------- marquette/merit/create.py | 4 +-- requirements-cuda.txt | 2 ++ 4 files changed, 45 insertions(+), 20 deletions(-) diff --git a/marquette/conf/config.yaml b/marquette/conf/config.yaml index f85964d..a3c0403 100644 --- a/marquette/conf/config.yaml +++ b/marquette/conf/config.yaml @@ -29,9 +29,8 @@ create_streamflow: start_date: 01-01-1980 end_date: 12-31-2020 map_lake_points: - buffer: 250 - hydrolakes: /projects/mhpi/data/hydroLakes/conus_hydrolakes.gpkg - edge_lakes: ${data_path}/zarr/lakes/edges/ + lake_points: /projects/mhpi/data/hydroLakes/merit_intersected_data/RIV_lake_intersection_${zone}.shp + zarr: /projects/mhpi/data/hydroLakes/hydrolakes.zarr extensions: - soils_data - pet_forcing diff --git a/marquette/merit/_map_lake_points.py b/marquette/merit/_map_lake_points.py index fb9044c..e8b5ab8 100644 --- a/marquette/merit/_map_lake_points.py +++ b/marquette/merit/_map_lake_points.py @@ -1,19 +1,16 @@ import logging from pathlib import Path -import cuspatial import geopandas as gpd -import pandas as pd +import numpy as np import zarr from omegaconf import DictConfig -from shapely.geometry import LineString -from shapely.wkt import loads +from tqdm import tqdm log = logging.getLogger(name=__name__) - def _map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None: - """A function that reads in a gdf of hydrolakes information, then intersects that point with edge data + """A function that reads in a gdf of hydrolakes information, finds its corresponding edge, then saves the data Parameters ---------- @@ -22,15 +19,44 @@ def _map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None: edges: zarr.Group The zarr group containing the edges """ - log.info("Reading in HydroLAKES data") - data_path = Path(cfg.map_lake_points.hydrolakes) + data_path = Path(cfg.map_lake_points.lake_points) if not data_path.exists(): - msg = f"{data_path} does not exist" + msg = "Cannot find the lake points file" log.exception(msg) raise FileNotFoundError(msg) gdf = gpd.read_file(data_path) - reserviors_geoseries = cuspatial.GeoSeries(gdf["geometry"]) - edge_coords = [loads(coords) for coords in edges.coords] - edge_geoseries = cuspatial.GeoSeries(gpd.GeoSeries(edge_coords)) - log.info("Intersecting HydroLAKES data with edge data") - for reservoir in reserviors_geoseries + lake_comids = gdf["COMID"].astype(int).values + edges_comids : np.ndarray = edges["merit_basin"][:].astype(np.int32) # type: ignore + + hylak_id = np.full(len(edges_comids), -1, dtype=np.int32) + grand_id = np.full_like(edges_comids, -1, dtype=np.int32) + lake_area = np.full_like(edges_comids, -1, dtype=np.float32) + vol_total = np.full_like(edges_comids, -1, dtype=np.float32) + depth_avg = np.full_like(edges_comids, -1, dtype=np.float32) + + for idx, lake_id in enumerate(tqdm( + lake_comids, + desc="Mapping Lake COMIDS to edges", + ncols=140, + ascii=True, + )) : + jdx = np.where(edges_comids == lake_id)[0] + if not jdx.size: + log.info(f"No lake found for COMID {lake_id}") + else: + # Assumung the pour point is at the end of the COMID + edge_idx = jdx[-1] + lake_row = gdf.iloc[idx] + hylak_id[edge_idx] = lake_row["Hylak_id"] + grand_id[edge_idx] = lake_row["Grand_id"] + lake_area[edge_idx] = lake_row["Lake_area"] + vol_total[edge_idx] = lake_row["Vol_total"] + depth_avg[edge_idx] = lake_row["Depth_avg"] + + edges.array("hylak_id", data=hylak_id) + edges.array("grand_id", data=grand_id) + edges.array("lake_area", data=lake_area) + edges.array("vol_total", data=vol_total) + edges.array("depth_avg", data=depth_avg) + + log.info("Wrote Lake data for zones to zarr") diff --git a/marquette/merit/create.py b/marquette/merit/create.py index 37185c5..7143dad 100644 --- a/marquette/merit/create.py +++ b/marquette/merit/create.py @@ -243,9 +243,7 @@ def map_lake_points(cfg: DictConfig, edges: zarr.Group) -> None: edges: zarr.Group The zarr group containing the edges """ - reservoirs = zarr.open_group(Path(cfg.map_lake_points.edge_lakes), mode="a") - zone_root = reservoirs.require_group(cfg.zone) - if "Hylak_id" in zone_root: + if "hylak_id" in edges: log.info("HydroLakes Intersection already exists in Zarr format") else: log.info("Mapping HydroLakes Pour Points to Edges") diff --git a/requirements-cuda.txt b/requirements-cuda.txt index 9ed2c26..454df43 100644 --- a/requirements-cuda.txt +++ b/requirements-cuda.txt @@ -2,3 +2,5 @@ cudf-cu12==24.6.* dask-cudf-cu12==24.6.* cugraph-cu12==24.6.* +cuspatial-cu12==24.6.* +cuproj-cu12==24.6.*