Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extension: Q_prime_sum data #23

Merged
merged 12 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.9', '3.10', '3.11', '3.12']
python-version: ['3.9', '3.10', '3.11']

steps:
- uses: actions/checkout@v3
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,5 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
tbindas/*
tbindas/.conda/*
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ Official citation coming soon. For now, use:
howpublished = {\url{https://github.com/taddyb/marquette}},
}
```

## cuGRaph
`https://docs.rapids.ai/install`
22 changes: 16 additions & 6 deletions marquette/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time

import hydra
import zarr
from omegaconf import DictConfig

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -37,12 +38,12 @@ def main(cfg: DictConfig) -> None:
log.info(f"Creating MERIT {cfg.zone} Connectivity Matrix (N) for gages")
create_N(cfg, edges)

log.info(f"Mapping {cfg.zone} Streamflow to TMs")
create_TMs(cfg, edges)

log.info("Converting Streamflow to zarr")
write_streamflow(cfg, edges)

log.info(f"Mapping {cfg.zone} Streamflow to Nodes/Edges")
create_TMs(cfg, edges)

log.info("Running Data Post-Processing Extensions")
run_extensions(cfg, edges)

Expand All @@ -52,7 +53,7 @@ def main(cfg: DictConfig) -> None:
log.error(f"incorrect name specified: {cfg.name}")


def run_extensions(cfg, edges):
def run_extensions(cfg: DictConfig, edges: zarr.Group) -> None:
"""
The function for running post-processing data extensions

Expand All @@ -65,15 +66,15 @@ def run_extensions(cfg, edges):

log.info("Adding soils information to your MERIT River Graph")
if "ksat" in edges:
log.info("global_dhbv_static_inputs already exists in zarr format")
log.info("soils information already exists in zarr format")
else:
soils_data(cfg, edges)
if "pet_forcing" in cfg.extensions:
from marquette.merit.extensions import pet_forcing

log.info("Adding PET forcing to your MERIT River Graph")
if "pet" in edges:
log.info("global_dhbv_static_inputs already exists in zarr format")
log.info("PET forcing already exists in zarr format")
else:
pet_forcing(cfg, edges)
if "global_dhbv_static_inputs" in cfg.extensions:
Expand All @@ -94,6 +95,15 @@ def run_extensions(cfg, edges):
else:
calculate_incremental_drainage_area(cfg, edges)

if "q_prime_sum" in cfg.extensions:
from marquette.merit.extensions import calculate_q_prime_summation

log.info("Adding q_prime_sum to your MERIT River Graph")
if "summed_q_prime" in edges:
log.info("q_prime_sum already exists in zarr format")
else:
calculate_q_prime_summation(cfg, edges)


if __name__ == "__main__":
main()
19 changes: 10 additions & 9 deletions marquette/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -1,38 +1,39 @@
name: MERIT
data_path: /projects/mhpi/data/${name}
zone: 74
zone: 73
create_edges:
buffer: 0.3334
dx: 2000
edges: ${data_path}/zarr/graph/edges/
edges: ${data_path}/zarr/graph/CONUS/edges/
flowlines: ${data_path}/raw/flowlines
create_N:
run_whole_zone: False
drainage_area_treshold: 0.1
filter_based_on_dataset: True
gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gage_9322_intersection.shp
gage_buffered_flowline_intersections: ${data_path}/gage_information/gage_flowline_intersections/gnn_dataset_v1_2.shp
gage_coo_indices: ${data_path}/zarr/gage_coo_indices
pad_gage_id: True
obs_dataset: ${data_path}/gage_information/obs_csvs/all_gages_info.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/all_gages_info_combined.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/${zone}_all.csv
pad_gage_id: False
obs_dataset: ${data_path}/gage_information/obs_csvs/GRDC_point_data.csv
obs_dataset_output: ${data_path}/gage_information/formatted_gage_csvs/gnn_formatted_basins.csv
zone_obs_dataset: ${data_path}/gage_information/formatted_gage_csvs/gnn_zone_${zone}.csv
create_TMs:
MERIT:
save_sparse: True
TM: ${data_path}/zarr/TMs/sparse_MERIT_FLOWLINES_${zone}
shp_files: ${data_path}/raw/basins/cat_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_streamflow:
version: merit_conus_v3.0
version: merit_conus_v6.14
data_store: ${data_path}/streamflow/zarr/${create_streamflow.version}/${zone}
obs_attributes: ${data_path}/gage_information/MERIT_basin_area_info
predictions: /projects/mhpi/yxs275/DM_output/dPL_local_daymet_new_attr_water_loss_v3_all_merit
predictions: /projects/mhpi/yxs275/DM_output/dPL_local_daymet_new_attr_water_loss_v6v10_random_batch_all_merit_forward/
start_date: 01-01-1980
end_date: 12-31-2020
extensions:
- soils_data
- pet_forcing
- global_dhbv_static_inputs
- incremental_drainage_area
- q_prime_sum
# Hydra Config ------------------------------------------------------------------------#
hydra:
help:
Expand Down
2 changes: 1 addition & 1 deletion marquette/merit/_TM_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def create_MERIT_FLOW_TM(
for idx, proportion in zip(indices, proportions):
column_index = np.where(river_graph_ids == river_graph_ids[idx])[0][0]
data_np[i][column_index] = proportion

if cfg.create_TMs.MERIT.save_sparse:
log.info("Writing to sparse matrix")
gage_coo_root = zarr.open_group(Path(cfg.create_TMs.MERIT.TM), mode="a")
Expand Down
18 changes: 14 additions & 4 deletions marquette/merit/_connectivity_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,19 @@ def find_closest_edge(
# filter based on large-list of gage_locations
gage_locations_df = pd.read_csv(cfg.create_N.obs_dataset)
try:
gage_ids = gage_locations_df["id"].astype(str).apply(lambda x: x.zfill(8))
if cfg.create_N.pad_gage_id:
gage_ids = (
gage_locations_df["id"].astype(str).apply(lambda x: x.zfill(8))
)
else:
gage_ids = gage_locations_df["id"]
except KeyError:
gage_ids = (
gage_locations_df["STAT_ID"].astype(str).apply(lambda x: x.zfill(8))
)
if cfg.create_N.pad_gage_id:
gage_ids = (
gage_locations_df["STAT_ID"].astype(str).apply(lambda x: x.zfill(8))
)
else:
gage_ids = gage_locations_df["STAT_ID"]
gdf = gdf[gdf["STAID"].isin(gage_ids)]
gdf["COMID"] = gdf["COMID"].astype(int)
filtered_gdf = filter_by_comid_prefix(gdf, cfg.zone)
Expand Down Expand Up @@ -186,6 +194,8 @@ def find_closest_edge(
result_df["LAT_GAGE"] = result_df["Latitude"]
except KeyError:
pass
if "HUC02" not in result_df.columns:
result_df["HUC02"] = 0

columns = [
"STAID",
Expand Down
144 changes: 88 additions & 56 deletions marquette/merit/extensions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import logging
from pathlib import Path

# import cugraph as cnx
# import cudf
import cupy as cp
import geopandas as gpd
import networkx as nx
import numpy as np
import pandas as pd
import polars as pl
Expand All @@ -12,62 +16,6 @@
log = logging.getLogger(__name__)


# class Direction(Enum):
# up = "up"
# down = "down"


# def traverse(df: pl.DataFrame, direction: Direction, count=1):
# TODO: Get this to work
# if direction == Direction.up:
# # Using only the up1 dir as we know it always exists
# _traverse = "up1"
# renamed_col = "NextDownID"
# elif direction == Direction.down:
# _traverse = "NextDownID"
# renamed_col = "up1"
# else:
# raise ValueError("Invalid direction. Look at the Direction Enum")
# traverse_df = df.clone()
# for _ in range(count):
# traverse_df = traverse_df.rename(
# {"COMID": "_COMID"}
# ).with_columns(
# pl.when(pl.col(_traverse) == 0)
# .then(pl.col("_COMID"))
# .otherwise(pl.col(renamed_col))
# .alias("COMID")
# )
# traverse_df = df.join(other=traverse_df, on="COMID", how="semi", join_nulls=True)
# traverse_df = traverse_df.drop("_COMID")
# return traverse_df


# def spatial_nan_filter(
# df: pl.DataFrame,
# ) -> pl.DataFrame:
# """
# Filling NaN values based on the predictions up or downstream
# Using the min value of the attribute is no Up or Downstream values are available
# """
# # TODO: Get this to work
# if df.null_count().to_numpy().sum() == 0:
# return df
# else:
# filled_df = traverse(
# df, Direction.down, count=3
# )
# if df.null_count().to_numpy().sum():
# filled_df = traverse(
# filled_df, Direction.up
# )
# if np.isnan(fill_value):
# fill_value = np.nanmin(df[attribute])
# nan_fill[i] = fill_value
# _attr_mapped[mask] = nan_fill
# return _attr_mapped


def soils_data(cfg: DictConfig, edges: zarr.Group) -> None:
flowline_file = (
Path(cfg.data_path)
Expand Down Expand Up @@ -274,3 +222,87 @@ def calculate_incremental_drainage_area(cfg: DictConfig, edges: zarr.Group) -> N
name="incremental_drainage_area",
data=result.select(pl.col("incremental_drainage_area")).to_numpy().squeeze(),
)


def calculate_q_prime_summation(cfg: DictConfig, edges: zarr.Group) -> None:
"""Creates Q` summed data for all edges in a given MERIT zone

Parameters:
----------
cfg: DictConfig
The configuration object.
edges: zarr.Group
The edges group in the MERIT zone
"""
n = 2 # number of splits (used for reducing memory load)
cp.cuda.runtime.setDevice(2) # manually setting the device to 2

streamflow_group = Path(
f"/projects/mhpi/data/MERIT/streamflow/zarr/{cfg.create_streamflow.version}/{cfg.zone}"
)
if streamflow_group.exists() is False:
raise FileNotFoundError("streamflow_group data not found")
streamflow_zarr: zarr.Group = zarr.open_group(streamflow_group, mode="r")
streamflow_time = streamflow_zarr.time[:]
_, counts = cp.unique(edges.segment_sorting_index[:], return_counts=True) # type: ignore
dim_0 : int = streamflow_zarr.time.shape[0] # type: ignore
dim_1 : int = edges.id.shape[0] # type: ignore
edge_ids = edges.id[:]
edge_index_mapping = {v: i for i, v in enumerate(edge_ids)}

q_prime_np = np.zeros([dim_0, dim_1]).transpose(1, 0)
streamflow_data = cp.array(streamflow_zarr.streamflow[:])


# Generating a networkx DiGraph object
df_path = Path(f"{cfg.create_edges.edges}").parent / f"{cfg.zone}_graph_df.csv"
if df_path.exists():
df = pd.read_csv(df_path)
else:
source = []
target = []
for idx, _ in enumerate(
tqdm(
edges.id[:],
desc="creating graph in dictionary form",
ascii=True,
ncols=140,
)
):
if edges.ds[idx] != "0_0":
source.append(idx)
target.append(edge_index_mapping[edges.ds[idx]])
df = pd.DataFrame({"source": source, "target": target})
df.to_csv(df_path, index=False)
G = nx.from_pandas_edgelist(df=df, create_using=nx.DiGraph(),)

time_split = np.array_split(streamflow_time, n) #type: ignore
for idx, time_range in enumerate(time_split):
q_prime_cp = cp.zeros([time_range.shape[0], dim_1]).transpose(1, 0)

for jdx, _ in enumerate(tqdm(
edge_ids,
desc=f"calculating q` sum data for part {idx}",
ascii=True,
ncols=140,
)):
streamflow_ds_id = edges.segment_sorting_index[jdx]
num_edges_in_comid = counts[streamflow_ds_id]
try:
graph = nx.descendants(G, jdx, backend="cugraph")
graph.add(jdx) # Adding the idx to ensure it's counted
downstream_idx = np.array(list(graph)) # type: ignore
q_prime_cp[downstream_idx] += (streamflow_data[time_range, streamflow_ds_id] / num_edges_in_comid) # type: ignore
except nx.exception.NetworkXError:
# This means there is no connectivity from this basin. It's one-node graph
q_prime_cp[jdx] = (streamflow_data[time_range, streamflow_ds_id] / num_edges_in_comid)

print("Saving GPU Memory to CPU; freeing GPU Memory")
q_prime_np[:, time_range] = cp.asnumpy(q_prime_cp)
del q_prime_cp
cp.get_default_memory_pool().free_all_blocks()

edges.array(
name="summed_q_prime",
data=q_prime_np.transpose(1, 0),
)
4 changes: 4 additions & 0 deletions requirements-cuda.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
--extra-index-url https://pypi.nvidia.com
cudf-cu12==24.6.*
dask-cudf-cu12==24.6.*
cugraph-cu12==24.6.*
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest==8.2.2
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ hydra-core
jupyter
jupyterlab
matplotlib
networkx
numpy
omegaconf
pandas
Expand Down
22 changes: 22 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pathlib import Path

import hydra
import numpy as np
import pytest

@pytest.fixture
def sample_gage_cfg():
with hydra.initialize(
version_base="1.3",
config_path="../marquette/conf/",
):
cfg = hydra.compose(config_name="config")
cfg.zone = "73"
return cfg

@pytest.fixture
def q_prime_data() -> np.ndarray:
q_prime_path = Path("/projects/mhpi/tbindas/marquette/tests/validated_data/q_prime_89905.npy")
if not q_prime_path.exists():
raise FileNotFoundError(f"File not found: {q_prime_path}")
return np.load(q_prime_path)
Loading
Loading