Skip to content

Commit

Permalink
added edges dataset to s3 (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
taddyb authored Dec 19, 2024
1 parent 8bdc25d commit 44ea60b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 25 deletions.
4 changes: 2 additions & 2 deletions marquette/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: merit_s3
s3_path: s3://mhpi-spatial/marquette/${name}/
s3_path: s3://mhpi-spatial/marquette/merit
data_path: /projects/mhpi/data/MERIT
zone: 73
gpu: 6
create_edges:
buffer: 0.3334
dx: 2000
edges: ${s3_path}/edges/
edges: ${s3_path}/edges
flowlines: ${data_path}/raw/flowlines/riv_pfaf_${zone}_MERIT_Hydro_v07_Basins_v01_bugfix1.shp
create_N:
run_whole_zone: False
Expand Down
44 changes: 21 additions & 23 deletions marquette/merit_s3/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd
import xarray as xr
import zarr
import dask.array
from dask.dataframe.io.io import from_pandas
from omegaconf import DictConfig
from tqdm import tqdm
Expand Down Expand Up @@ -48,11 +49,10 @@ def write_streamflow(cfg: DictConfig, edges: zarr.Group) -> None:
log.info("Streamflow data already exists")


def create_edges(cfg: DictConfig) -> zarr.Group:
def create_edges(cfg: DictConfig) -> xr.Dataset:
try:
root = xr.open_datatree(cfg.create_edges.edges, engine="zarr")
edges = xr.open_dataset(f"{cfg.create_edges.edges}/{str(cfg.zone)}", engine="zarr")
log.info("Edge data already exists on s3")
edges = root[str(cfg.zone)]
except FileNotFoundError:
log.info("Edge data does not exist. Creating connections")
polyline_gdf = gpd.read_file(cfg.create_edges.flowlines)
Expand Down Expand Up @@ -171,27 +171,25 @@ def create_edges(cfg: DictConfig) -> zarr.Group:
merged_df[col] = merged_df[col].astype(dtype=np.int32)
for col in ["len", "len_dir", "slope", "sinuosity", "stream_drop", "uparea"]:
merged_df[col] = merged_df[col].astype(dtype=np.float32)
# xr_dataset = xr.Dataset.from_dataframe(merged_df)
ds = merged_df.to_xarray().assign_coords({"merit_basin": merged_df["merit_basin"]})
sorted_keys_array = np.array(sorted_keys)
sorted_edges = xr.Dataset()
for var_name in xr_dataset.data_vars:
sorted_edges[var_name] = sort_xarray_dataarray(
xr_dataset[var_name],
sorted_keys_array,
xr_dataset["segment_sorting_index"].values,
)
shape = sorted_edges[var_name].shape
dtype = sorted_edges[var_name].dtype
tmp = edges.zeros(var_name, shape=shape, chunks=1000, dtype=dtype)
tmp[:] = sorted_edges[var_name].values
tmp = edges.zeros(
"sorted_keys",
shape=sorted_keys_array.shape,
chunks=1000,
dtype=sorted_keys_array.dtype,

idx = np.argsort(merged_df["uparea"])
sorted_df = merged_df.iloc[idx]
merit_basins = sorted_df["merit_basin"]

edges = xr.Dataset(
{var: (["comid"], sorted_df[var]) for var in sorted_df.columns if var != "crs"},
coords={"comid": merit_basins}
)
edges.attrs['crs'] = sorted_df["crs"].unique()[0]

# For faster writes, it's easier to mock the datastore using dask, and then upload the store using to_zarr()
# dummies = dask.array.zeros(merit_basins.shape[0], chunks=merit_basins.shape[0])
# xr.Dataset(data_vars={"id": ("merit_basins", dummies)}, coords={"merit_basins": merit_basins}).to_zarr(cfg.create_edges.edges, compute=False)
edges.to_zarr(
store=f"{cfg.create_edges.edges}/{str(cfg.zone)}",
mode='w',
consolidated=True
)
tmp[:] = sorted_keys_array
return edges


Expand Down

0 comments on commit 44ea60b

Please sign in to comment.