From 51dc6871ff4a917978378d7195134229771b5517 Mon Sep 17 00:00:00 2001 From: Adam Wlostowski <66840445+awlostowski-noaa@users.noreply.github.com> Date: Thu, 9 Dec 2021 15:05:26 -0500 Subject: [PATCH] Write flow at gage locations to netCDF (CHANOBS) file (#505) * create chanobs draft * move writing sequence to function in nhd_io * yaml documentation Co-authored-by: awlostowski-noaa --- doc/v3_doc.yaml | 16 ++- src/nwm_routing/src/nwm_routing/output.py | 23 +++- src/python_framework_v02/troute/nhd_io.py | 153 ++++++++++++++++++++++ 3 files changed, 189 insertions(+), 3 deletions(-) diff --git a/doc/v3_doc.yaml b/doc/v3_doc.yaml index 0c2fdb566..6f4d5576e 100644 --- a/doc/v3_doc.yaml +++ b/doc/v3_doc.yaml @@ -488,8 +488,22 @@ compute_parameters: # parameters controlling model outputs output_parameters: # --------------- + # chanobs writing parameters + # "CHANOBS" netCDF files contain timeseries of t-route simulated routed flow results + # at gage locations only. + # optional, only needed if writing data to CHANOBS (netCDF) files + chanobs_output: + # --------------- + # path to directory where chanobs files will be written + # (!!) mandatory if writing chanobs files + chanobs_output_directory: + # --------------- + # name of netCDF file where chanobs data will be written + # (!!) mandatory if writng chanobs files + chanobs_filepath: + # --------------- # csv writing parameters - # (!!) mandatory + # optional, only needed if writing data to CSV # TODO: remove output dependencies on csv_output field, here csv_output: # --------------- diff --git a/src/nwm_routing/src/nwm_routing/output.py b/src/nwm_routing/src/nwm_routing/output.py index a7f2271cc..80fd26110 100644 --- a/src/nwm_routing/src/nwm_routing/output.py +++ b/src/nwm_routing/src/nwm_routing/output.py @@ -2,7 +2,7 @@ import numpy as np import pandas as pd from pathlib import Path -from datetime import datetime +from datetime import datetime, timedelta import troute.nhd_io as nhd_io from build_tests import parity_check import logging @@ -73,6 +73,7 @@ def nwm_output_generator( rsrto = output_parameters.get("hydro_rst_output", None) chrto = output_parameters.get("chrtout_output", None) test = output_parameters.get("test_output", None) + chano = output_parameters.get("chanobs_output", None) if csv_output: csv_output_folder = output_parameters["csv_output"].get( @@ -81,7 +82,7 @@ def nwm_output_generator( csv_output_segments = csv_output.get("csv_output_segments", None) - if csv_output_folder or rsrto or chrto or test: + if csv_output_folder or rsrto or chrto or chano or test: start = time.time() qvd_columns = pd.MultiIndex.from_product( @@ -213,6 +214,24 @@ def nwm_output_generator( LOG.debug("writing CSV file took %s seconds." % (time.time() - start)) # usgs_df_filtered = usgs_df[usgs_df.index.isin(csv_output_segments)] # usgs_df_filtered.to_csv(output_path.joinpath("usgs_df.csv")) + + if chano: + + LOG.info("- writing t-route flow results at gage locations to CHANOBS file") + start = time.time() + + nhd_io.write_chanobs( + Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']), + flowveldepth, + link_gage_df, + t0, + dt, + nts, + # TODO allow user to pass a list of segments at which they would like to print results + # rather than just printing at gages. + ) + + LOG.debug("writing flow data to CHANOBS took %s seconds." % (time.time() - start)) # Write out LastObs as netcdf. # Assumed that this capability is only needed for AnA simulations diff --git a/src/python_framework_v02/troute/nhd_io.py b/src/python_framework_v02/troute/nhd_io.py index 93fab9e71..6356053ca 100644 --- a/src/python_framework_v02/troute/nhd_io.py +++ b/src/python_framework_v02/troute/nhd_io.py @@ -15,6 +15,7 @@ import time import logging from joblib import delayed, Parallel +from cftime import date2num @@ -345,6 +346,158 @@ def get_ql_from_wrf_hydro_mf( def drop_all_coords(ds): return ds.reset_coords(drop=True) +def write_chanobs( + chanobs_filepath, + flowveldepth, + link_gage_df, + t0, + dt, + nts +): + + ''' + Write results at gage locations to netcdf. + If the user specified file does not exist, create it. + If the user specified file already exiss, append it. + + Arguments + ------------- + chanobs_filepath (Path or string) - + flowveldepth (DataFrame) - t-route flow velocity and depth results + link_gage_df (DataFrame) - linkIDs of gages in network + t0 (datetime) - initial time + dt (int) - timestep duration (seconds) + nts (int) - number of timesteps in simulation + + Returns + ------------- + + ''' + + # array of segment linkIDs at gage locations. Results from these segments will be written + gage_feature_id = link_gage_df.index.to_numpy(dtype = "int32") + + # array of simulated flow data at gage locations + gage_flow_data = flowveldepth.loc[link_gage_df.index].iloc[:,::3].to_numpy(dtype="float32") + + # array of simulation time + gage_flow_time = [t0 + timedelta(seconds = (i+1) * dt) for i in range(nts)] + + if not chanobs_filepath.is_file(): + + # if no chanobs file exists, create a new one + # open netCDF4 Dataset in write mode + with netCDF4.Dataset( + filename = chanobs_filepath, + mode = 'w', + format = "NETCDF4" + ) as f: + + # =========== DIMENSIONS =============== + _ = f.createDimension("time", None) + _ = f.createDimension("feature_id", len(gage_feature_id)) + _ = f.createDimension("reference_time", 1) + + # =========== time VARIABLE =============== + TIME = f.createVariable( + varname = "time", + datatype = 'int32', + dimensions = ("time",), + ) + TIME[:] = date2num( + gage_flow_time, + units = "minutes since 1970-01-01 00:00:00 UTC", + calendar = "gregorian" + ) + f['time'].setncatts( + { + 'long_name': 'model initialization time', + 'standard_name': 'forecast_reference_time', + 'units': 'minutes since 1970-01-01 00:00:00 UTC' + } + ) + + # =========== reference_time VARIABLE =============== + REF_TIME = f.createVariable( + varname = "reference_time", + datatype = 'int32', + dimensions = ("reference_time",), + ) + REF_TIME[:] = date2num( + t0, + units = "minutes since 1970-01-01 00:00:00 UTC", + calendar = "gregorian" + ) + f['reference_time'].setncatts( + { + 'long_name': 'vaild output time', + 'standard_name': 'time', + 'units': 'minutes since 1970-01-01 00:00:00 UTC' + } + ) + + # =========== feature_id VARIABLE =============== + FEATURE_ID = f.createVariable( + varname = "feature_id", + datatype = 'int32', + dimensions = ("feature_id",), + ) + FEATURE_ID[:] = gage_feature_id + f['feature_id'].setncatts( + { + 'long_name': 'Reach ID', + 'comment': 'NHDPlusv2 ComIDs within CONUS, arbitrary Reach IDs outside of CONUS', + 'cf_role:': 'timeseries_id' + } + ) + + # =========== streamflow VARIABLE =============== + y = f.createVariable( + varname = "streamflow", + datatype = "f4", + dimensions = ("time", "feature_id"), + fill_value = np.nan + ) + y[:] = gage_flow_data.reshape( + len(gage_flow_time), + len(gage_feature_id) + ) + + # =========== GLOBAL ATTRIBUTES =============== + f.setncatts( + { + 'model_initialization_time': t0.strftime('%Y-%m-%d_%H:%M:%S'), + 'model_output_valid_time': gage_flow_time[0].strftime('%Y-%m-%d_%H:%M:%S'), + } + ) + + else: + + # append data to chanobs file + # open netCDF4 Dataset in r+ mode to append + with netCDF4.Dataset( + filename = chanobs_filepath, + mode = 'r+', + format = "NETCDF4" + ) as f: + + # =========== format variable data to be appended =============== + time_new = date2num( + gage_flow_time, + units = "minutes since 1970-01-01 00:00:00 UTC", + calendar = "gregorian" + ) + + flow_new = gage_flow_data.reshape( + len(gage_flow_time), + len(gage_feature_id) + ) + + # =========== append new flow data =============== + tshape = len(f.dimensions['time']) + f['time'][tshape:(tshape+nts)] = time_new + f['streamflow'][tshape:(tshape+nts)] = flow_new + def write_to_netcdf(f, variables, datatype = 'f4'): '''