Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test script to process nwm forcing into ngen input #26

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
26 changes: 26 additions & 0 deletions ngen_forcing/defs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import rasterio.mask as riomask


def polymask(dataset, invert=False, all_touched=False):
def _polymask(poly):
return riomask.raster_geometry_mask(
dataset, [poly], invert=invert, all_touched=all_touched, crop=True
)

return _polymask


def xr_read_window(ds, window, mask=None):
data = ds.isel(window)
if mask is None:
return data
else:
return data.where(mask)


def xr_read_window_time(ds, window, mask=None, idx=None, time=None):
data = ds.isel(window)
if mask is None:
return idx, time, data
else:
return idx, time, data.where(mask)
258 changes: 258 additions & 0 deletions ngen_forcing/process_nwm_forcing_to_ngen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
from defs import xr_read_window, polymask, xr_read_window_time
from rasterio import _io, windows
import xarray as xr
import pandas as pd


class MemoryDataset(_io.MemoryDataset, windows.WindowMethodsMixin):
pass


def get_forcing_dict_newway(
feature_index,
feature_list,
folder_prefix,
file_list,
var_list,
):
reng = "rasterio"

_xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
_template_arr = _xds_dummy.U2D.values
_u2d = MemoryDataset(
_template_arr,
transform=_xds_dummy.U2D.rio.transform(),
gcps=None,
rpcs=None,
crs=None,
copy=False,
)

ds_list = []
for _nc_file in file_list:
_full_nc_file = folder_prefix.joinpath(_nc_file)
ds_list.append(xr.open_dataset(_full_nc_file, engine=reng))

df_dict = {}
for _v in var_list:
df_dict[_v] = pd.DataFrame(index=feature_index)

for i, feature in enumerate(feature_list):
print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r")
mask, _, window = polymask(_u2d)(feature)
mask = xr.DataArray(mask, dims=["y", "x"])
winslices = dict(zip(["y", "x"], window.toslices()))
for j, _xds in enumerate(ds_list):
time_value = _xds.time.values[0]
cropped = xr_read_window(_xds, winslices, mask=mask)
stats = cropped.mean()
for var in var_list:
df_dict[var].loc[i, time_value] = stats[var]

[ds.close() for ds in ds_list]
return df_dict


def get_forcing_dict_newway_parallel(
feature_index,
feature_list,
folder_prefix,
file_list,
var_list,
para="thread",
para_n=2,
):

import concurrent.futures

reng = "rasterio"
_xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
_template_arr = _xds.U2D.values
_u2d = MemoryDataset(
_template_arr,
transform=_xds.U2D.rio.transform(),
gcps=None,
rpcs=None,
crs=None,
copy=False,
)
ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list]
# ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list]
# TODO: figure out why using the rasterio engine DOES NOT WORK with parallel
# TODO: figure out why NOT using the rasterio engine produces a different result

if para == "process":
pool = concurrent.futures.ProcessPoolExecutor
elif para == "thread":
pool = concurrent.futures.ThreadPoolExecutor
else:
pool = concurrent.futures.ThreadPoolExecutor

stats = []
future_list = []

with pool(max_workers=para_n) as executor:

for _i, _m in enumerate(map(polymask(_u2d), feature_list)):
print(f"{_i}, {round(_i/len(feature_list), 5)*100}".ljust(40), end="\r")
mask, _, window = _m
mask = xr.DataArray(mask, dims=["y", "x"])
winslices = dict(zip(["y", "x"], window.toslices()))
for ds in ds_list:
_t = ds.time.values[0]
future = executor.submit(
xr_read_window_time, ds, winslices, mask=mask, idx=_i, time=_t
)
# cropped = xr_read_window(f, winslices, mask=mask)
# stats.append(cropped.mean())
future_list.append(future)
for _f in concurrent.futures.as_completed(future_list):
_j, _t, _s = _f.result()
stats.append((_j, _t, _s))

df_dict = {}
for _v in var_list:
df_dict[_v] = pd.DataFrame(index=feature_index)

for j, t, s in stats:
for var in var_list:
df_dict[var].loc[j, t] = s[var].mean()

[ds.close() for ds in ds_list]
return df_dict


def get_forcing_dict_newway_inverted(
feature_index,
feature_list,
folder_prefix,
file_list,
var_list,
):
reng = "rasterio"

_xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
_template_arr = _xds_dummy.U2D.values
_u2d = MemoryDataset(
_template_arr,
transform=_xds_dummy.U2D.rio.transform(),
gcps=None,
rpcs=None,
crs=None,
copy=False,
)
ds_list = []
for _nc_file in file_list:
_full_nc_file = folder_prefix.joinpath(_nc_file)
ds_list.append(xr.open_dataset(_full_nc_file, engine=reng))

stats = []
mask_win_list = []

for i, feature in enumerate(feature_list):
print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r")
mask, _, window = polymask(_u2d)(feature)
mask = xr.DataArray(mask, dims=["y", "x"])
winslices = dict(zip(["y", "x"], window.toslices()))
mask_win_list.append((mask, winslices))

for i, f in enumerate(ds_list):
print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r")
time_value = f.time.values[0]
# TODO: when we read the window, could the time be added as a dimension?
for j, (_m, _w) in enumerate(mask_win_list):
cropped = xr_read_window(f, _w, mask=_m)
stats.append((j, time_value, cropped.mean()))

df_dict = {}
for _v in var_list:
df_dict[_v] = pd.DataFrame(index=feature_index)

for j, t, s in stats:
for var in var_list:
df_dict[var].loc[j, t] = s[var]

[ds.close() for ds in ds_list]
return df_dict


def get_forcing_dict_newway_inverted_parallel(
feature_index,
feature_list,
folder_prefix,
file_list,
var_list,
para="thread",
para_n=2,
):
import concurrent.futures

reng = "rasterio"
_xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
_template_arr = _xds.U2D.values
_u2d = MemoryDataset(
_template_arr,
transform=_xds.U2D.rio.transform(),
gcps=None,
rpcs=None,
crs=None,
copy=False,
)

ds_list = [xr.open_dataset("data/" + f) for f in file_list]

stats = []
future_list = []
mask_win_list = []

for i, feature in enumerate(feature_list):
print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r")
mask, _, window = polymask(_u2d)(feature)
mask = xr.DataArray(mask, dims=["y", "x"])
winslices = dict(zip(["y", "x"], window.toslices()))
mask_win_list.append((mask, winslices))

ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list]
# ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list]
# TODO: figure out why using the rasterio engine DOES NOT WORK with parallel
# TODO: figure out why NOT using the rasterio engine produces a different result

stats = []
future_list = []

if para == "process":
pool = concurrent.futures.ProcessPoolExecutor
elif para == "thread":
pool = concurrent.futures.ThreadPoolExecutor
else:
pool = concurrent.futures.ThreadPoolExecutor

with pool(max_workers=para_n) as executor:
df_dict = {}
for _v in var_list:
df_dict[_v] = pd.DataFrame(index=feature_index)

for j, ds in enumerate(ds_list):
print(f"{j}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r")
_t = ds.time.values[0]
for _i, (_m, _w) in enumerate(mask_win_list):
future = executor.submit(
xr_read_window_time, ds, _w, mask=_m, idx=_i, time=_t
)
# cropped = xr_read_window(ds, _w, mask=_m)
# stats.append(cropped.mean())
future_list.append(future)
for _f in concurrent.futures.as_completed(future_list):
_j, _t, _s = _f.result()
stats.append((_j, _t, _s))

df_dict = {}
for _v in var_list:
df_dict[_v] = pd.DataFrame(index=feature_index)

for j, t, s in stats:
for var in var_list:
df_dict[var].loc[j, t] = s[var].mean()

[ds.close() for ds in ds_list]
return df_dict
Loading