diff --git a/marquette/conf/config.yaml b/marquette/conf/config.yaml index 878d56d..f313ac6 100644 --- a/marquette/conf/config.yaml +++ b/marquette/conf/config.yaml @@ -1,12 +1,12 @@ name: merit_s3 -s3_path: s3://mhpi-spatial/marquette/${name}/ +s3_path: s3://mhpi-spatial/marquette/merit data_path: /projects/mhpi/data/MERIT zone: 73 gpu: 6 create_edges: buffer: 0.3334 dx: 2000 - edges: ${s3_path}/edges/ + edges: ${s3_path}/edges flowlines: ${data_path}/raw/flowlines/riv_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp create_N: run_whole_zone: False diff --git a/marquette/merit_s3/create.py b/marquette/merit_s3/create.py index e519c19..60a9480 100644 --- a/marquette/merit_s3/create.py +++ b/marquette/merit_s3/create.py @@ -6,6 +6,7 @@ import pandas as pd import xarray as xr import zarr +import dask.array from dask.dataframe.io.io import from_pandas from omegaconf import DictConfig from tqdm import tqdm @@ -48,11 +49,10 @@ def write_streamflow(cfg: DictConfig, edges: zarr.Group) -> None: log.info("Streamflow data already exists") -def create_edges(cfg: DictConfig) -> zarr.Group: +def create_edges(cfg: DictConfig) -> xr.Dataset: try: - root = xr.open_datatree(cfg.create_edges.edges, engine="zarr") + edges = xr.open_dataset(f"{cfg.create_edges.edges}/{str(cfg.zone)}", engine="zarr") log.info("Edge data already exists on s3") - edges = root[str(cfg.zone)] except FileNotFoundError: log.info("Edge data does not exist. Creating connections") polyline_gdf = gpd.read_file(cfg.create_edges.flowlines) @@ -171,27 +171,25 @@ def create_edges(cfg: DictConfig) -> zarr.Group: merged_df[col] = merged_df[col].astype(dtype=np.int32) for col in ["len", "len_dir", "slope", "sinuosity", "stream_drop", "uparea"]: merged_df[col] = merged_df[col].astype(dtype=np.float32) - # xr_dataset = xr.Dataset.from_dataframe(merged_df) - ds = merged_df.to_xarray().assign_coords({"merit_basin": merged_df["merit_basin"]}) - sorted_keys_array = np.array(sorted_keys) - sorted_edges = xr.Dataset() - for var_name in xr_dataset.data_vars: - sorted_edges[var_name] = sort_xarray_dataarray( - xr_dataset[var_name], - sorted_keys_array, - xr_dataset["segment_sorting_index"].values, - ) - shape = sorted_edges[var_name].shape - dtype = sorted_edges[var_name].dtype - tmp = edges.zeros(var_name, shape=shape, chunks=1000, dtype=dtype) - tmp[:] = sorted_edges[var_name].values - tmp = edges.zeros( - "sorted_keys", - shape=sorted_keys_array.shape, - chunks=1000, - dtype=sorted_keys_array.dtype, + + idx = np.argsort(merged_df["uparea"]) + sorted_df = merged_df.iloc[idx] + merit_basins = sorted_df["merit_basin"] + + edges = xr.Dataset( + {var: (["comid"], sorted_df[var]) for var in sorted_df.columns if var != "crs"}, + coords={"comid": merit_basins} + ) + edges.attrs['crs'] = sorted_df["crs"].unique()[0] + + # For faster writes, it's easier to mock the datastore using dask, and then upload the store using to_zarr() + # dummies = dask.array.zeros(merit_basins.shape[0], chunks=merit_basins.shape[0]) + # xr.Dataset(data_vars={"id": ("merit_basins", dummies)}, coords={"merit_basins": merit_basins}).to_zarr(cfg.create_edges.edges, compute=False) + edges.to_zarr( + store=f"{cfg.create_edges.edges}/{str(cfg.zone)}", + mode='w', + consolidated=True ) - tmp[:] = sorted_keys_array return edges