From 528d0a51f13eccdb3a3ee413cfc9037f0734e7b7 Mon Sep 17 00:00:00 2001 From: EstelleMing <1714424170@qq.com> Date: Mon, 11 Nov 2024 16:30:54 +0800 Subject: [PATCH] Added CAMELS_AUS_v2.0.0 data; Modify the calculation of the function unit of read_target_cols() --only streamflow_MLd; Added unit selection to read_mean_prcp(). --- examples/aus_test.py | 62 +++++++++++++++ examples/aus_v2_test.py | 41 ++++++++++ examples/scripts.py | 29 +++++++ hydrodataset/__init__.py | 2 +- hydrodataset/camels.py | 166 ++++++++++++++++++++++++++++++++++++--- 5 files changed, 286 insertions(+), 14 deletions(-) create mode 100644 examples/aus_test.py create mode 100644 examples/aus_v2_test.py diff --git a/examples/aus_test.py b/examples/aus_test.py new file mode 100644 index 0000000..7b51b42 --- /dev/null +++ b/examples/aus_test.py @@ -0,0 +1,62 @@ +import os +import numpy as np +import pandas as pd +from tqdm import tqdm +from hydrodataset import Camels + +directory = 'basin_flow' +if not os.path.exists(directory): + os.makedirs(directory) + +camels_aus_path = "/ftproot/camels/camels_aus/" +camels_aus_v2_path="/home/estelle/data/waterism/datasets-origin/camels/camels_aus_v2/" + +aus_region = "AUS" +aus_v2_region = "AUS_v2" +# ------------------------------ AUS -------------------------------- +camels_aus = Camels(camels_aus_path, download=False, region=aus_region) +camels_aus_v2=Camels(camels_aus_v2_path, download=False, region=aus_v2_region) +gage_ids = camels_aus.read_object_ids() + +# Return -> np.array: forcing data +hydro_info = camels_aus.read_relevant_cols( + gage_ids[:], + ["2015-01-01", "2022-02-15"], + ["et_morton_point_SILO", "precipitation_SILO", "et_morton_actual_SILO"] +) + +gages_to_nan = ['403213A', '224213A', '224214A', '227225A'] + +# 1 Megaliters Per Day = 0.011574074074074 Cubic Meters Per Second +# ML_to_m3_per_s = 0.011574074074074 + +t_info = pd.date_range(start="2015-01-01", end="2022-02-14", freq='D') +formatted_time = t_info.strftime('%Y-%m-%d %H:%M:%S') + +for i, gage_id in enumerate(gage_ids): + hydro_data = hydro_info[i] + + if gage_id in gages_to_nan: + streamflow_data_m3_per_s = np.nan * np.ones_like(hydro_data[:, 0]) + else: + # Return -> np.array: streamflow data, 3-dim [station, time, streamflow(ML/d)] + streamflow_info = camels_aus_v2.read_target_cols( + gage_ids[i:i+1], + ["2015-01-01", "2022-02-15"], + target_cols=["streamflow_MLd"], + ) + streamflow_data_m3_per_s = (streamflow_info[0,:,0]/35.314666721489) + + pet = hydro_data[:, 0] + prcp = hydro_data[:, 1] + flow = streamflow_data_m3_per_s + et = hydro_data[:, 2] + node1_flow = np.nan * np.ones_like(flow) # NA for node1_flow + merged_row = np.column_stack([formatted_time, pet, prcp, flow, et, node1_flow]) + # tiem pet(mm/day) prcp(mm/day) flow(m^3/s) et(mm/day) node1_flow(m^3/s) + columns = ["time", "pet(mm/day)", "prcp(mm/day)", "flow(m^3/s)", "et(mm/day)", "node1_flow(m^3/s)"] + df = pd.DataFrame(merged_row, columns=columns) + filename = f'basin_{gage_id}.csv' + file_path = os.path.join(directory, filename) + df.to_csv(file_path, index=False) + diff --git a/examples/aus_v2_test.py b/examples/aus_v2_test.py new file mode 100644 index 0000000..6b0f720 --- /dev/null +++ b/examples/aus_v2_test.py @@ -0,0 +1,41 @@ +import os +import numpy as np + +from hydrodataset import Camels + +camels_aus_v2_path="/home/estelle/data/waterism/datasets-origin/camels/camels_aus_v2/" + +aus_v2_region = "AUS_v2" + +# ---------------------------- AUS-V2 ------------------------------- +camels_aus_v2=Camels(camels_aus_v2_path, download=False, region=aus_v2_region) + +gage_ids = camels_aus_v2.read_object_ids() + +p_mean_info=camels_aus_v2.read_mean_prcp( + gage_ids[:5],unit="mm/h" +) +print(p_mean_info) + +attrs = camels_aus_v2.read_constant_cols( + gage_ids[:5], var_lst=["catchment_area", "geol_sec", "metamorph"] +) +print(attrs) +forcings = camels_aus_v2.read_relevant_cols( + gage_ids[:5], + ["1990-01-01", "2010-01-01"], + var_lst=["precipitation_AGCD", "et_morton_actual_SILO", "tmin_SILO"], +) +print(forcings.shape) +flows = camels_aus_v2.read_target_cols( + gage_ids[:5], + ["2015-01-01", "2022-01-01"], + target_cols=["streamflow_MLd", "streamflow_mmd"], +) +print(flows) +streamflow_types = camels_aus_v2.get_target_cols() +print(streamflow_types) +focing_types = camels_aus_v2.get_relevant_cols() +print(focing_types) +attr_types = camels_aus_v2.get_constant_cols() +print(attr_types) \ No newline at end of file diff --git a/examples/scripts.py b/examples/scripts.py index 16820d9..ba96649 100644 --- a/examples/scripts.py +++ b/examples/scripts.py @@ -18,7 +18,9 @@ camels_cl_path = os.path.join("camels", "camels_cl") camels_gb_path = os.path.join("camels", "camels_gb") camels_us_path = os.path.join("camels", "camels_us") +camels_aus_v2_path=os.path.join("camels","camels_aus_v2") +aus_v2_region = "AUS_v2" aus_region = "AUS" br_region = "BR" cl_region = "CL" @@ -171,6 +173,33 @@ attr_types[:3], np.array(["station_name", "drainage_division", "river_region"]) ) +# # ---------------------------- AUS-V2 ------------------------------- +camels_aus_v2=Camels(camels_aus_v2_path, download=False, region=aus_v2_region) +gage_ids = camels_aus_v2.read_object_ids() +assert gage_ids.size == 561 +attrs = camels_aus_v2.read_constant_cols( + gage_ids[:5], var_lst=["catchment_area", "geol_sec", "metamorph"] +) +print(attrs) +forcings = camels_aus_v2.read_relevant_cols( + gage_ids[:5], + ["1990-01-01", "2010-01-01"], + var_lst=["precipitation_AGCD", "et_morton_actual_SILO", "tmin_SILO"], +) +print(forcings.shape) +flows = camels_aus_v2.read_target_cols( + gage_ids[:5], + ["1990-01-01", "2010-01-01"], + target_cols=["streamflow_MLd", "streamflow_mmd"], +) +print(flows.shape) +streamflow_types = camels_aus_v2.get_target_cols() +print(streamflow_types) +focing_types = camels_aus_v2.get_relevant_cols() +print(focing_types) +attr_types = camels_aus_v2.get_constant_cols() +print(attr_types) + # ------------------------------ BR -------------------------------- camels_br = Camels(camels_br_path, download=False, region=br_region) gage_ids = camels_br.read_object_ids() diff --git a/hydrodataset/__init__.py b/hydrodataset/__init__.py index a17fafd..b076f23 100644 --- a/hydrodataset/__init__.py +++ b/hydrodataset/__init__.py @@ -76,7 +76,7 @@ def read_setting(setting_path): # set some constants for datasets DATASETS = ["CAMELS", "Caravan", "GRDC", "HYSETS", "LamaH", "MOPEX"] -CAMELS_REGIONS = ["AUS", "BR", "CL", "GB", "US"] +CAMELS_REGIONS = ["AUS", "BR", "CL", "GB", "US", "AUS_v2"] LAMAH_REGIONS = ["CE"] # For CANOPEX, We don't treat it as a dataset, but a special case for MOPEX. We only have CANOPEX now. MOPEX_REGIONS = ["CA"] diff --git a/hydrodataset/camels.py b/hydrodataset/camels.py index 650f62d..a501f3a 100644 --- a/hydrodataset/camels.py +++ b/hydrodataset/camels.py @@ -139,6 +139,8 @@ def set_data_source_describe(self) -> collections.OrderedDict: return self._set_data_source_camelscl_describe(camels_db) elif self.region == "GB": return self._set_data_source_camelsgb_describe(camels_db) + elif self.region == "AUS_v2": + return self._set_data_source_camelsausv2_describe(camels_db) else: raise NotImplementedError(CAMELS_NO_DATASET_ERROR_LOG) @@ -329,6 +331,36 @@ def _set_data_source_camelsbr_describe(self, camels_db): CAMELS_BASINS_SHP_FILE=camels_shp_file, ) + def _set_data_source_camelsausv2_describe(self, camels_db): + # id and name + gauge_id_file = camels_db.joinpath( + "01_id_name_metadata", + "01_id_name_metadata", + "id_name_metadata.csv", + ) + # shp file of basins + camels_shp_file = camels_db.joinpath( + "02_location_boundary_area", + "02_location_boundary_area", + "shp", + "CAMELS_AUS_v2_BasinOutlets_adopted.shp", + ) + # config of flow data + flow_dir = camels_db.joinpath("03_streamflow", "03_streamflow") + # attr + attr_dir = camels_db.joinpath("04_attributes", "04_attributes") + # forcing + forcing_dir = camels_db.joinpath("05_hydrometeorology", "05_hydrometeorology") + + return collections.OrderedDict( + CAMELS_DIR=camels_db, + CAMELS_FLOW_DIR=flow_dir, + CAMELS_FORCING_DIR=forcing_dir, + CAMELS_ATTR_DIR=attr_dir, + CAMELS_GAUGE_FILE=gauge_id_file, + CAMELS_BASINS_SHP_FILE=camels_shp_file, + ) + def _set_data_source_camelsaus_describe(self, camels_db): # id and name gauge_id_file = camels_db.joinpath( @@ -463,6 +495,8 @@ def read_site_info(self) -> pd.DataFrame: ) elif self.region == "AUS": data = pd.read_csv(camels_file, sep=",", dtype={"station_id": str}) + elif self.region == "AUS_v2": + data = pd.read_csv(camels_file, sep=",", dtype={"station_id": str}) elif self.region == "BR": data = pd.read_csv(camels_file, sep="\s+", dtype={"gauge_id": str}) elif self.region == "CL": @@ -493,6 +527,14 @@ def get_constant_cols(self) -> np.ndarray: camels_aus_attr_indices_data = pd.read_csv(attr_all_file, sep=",") # exclude station id return camels_aus_attr_indices_data.columns.values[1:] + elif self.region == "AUS_v2": + attr_all_file = os.path.join( + self.data_source_description["CAMELS_DIR"], + "CAMELS_AUS_Attributes&Indices_MasterTable.csv", + ) + camels_aus_v2_attr_indices_data = pd.read_csv(attr_all_file, sep=",") + # exclude station id + return camels_aus_v2_attr_indices_data.columns.values[1:] elif self.region == "BR": return self._get_constant_cols_some( data_folder, "camels_br_", ".txt", "\s+" @@ -545,6 +587,17 @@ def get_relevant_cols(self) -> np.ndarray: file[:-4] for file in files if file != "ClimaticIndices.csv" ) return np.array(forcing_types) + elif self.region == "AUS_v2": + forcing_types = [] + for root, dirs, files in os.walk( + self.data_source_description["CAMELS_FORCING_DIR"] + ): + if root == self.data_source_description["CAMELS_FORCING_DIR"]: + continue + forcing_types.extend( + file[:-4] for file in files if file not in ["ClimaticIndices.csv", "desktop.ini"] + ) + return np.array(forcing_types) elif self.region == "BR": return np.array( [ @@ -602,6 +655,18 @@ def get_target_cols(self) -> np.ndarray: "streamflow_QualityCodes", ] ) + elif self.region == "AUS_v2": + # QualityCodes are not streamflow data. + # MLd means "1 Megaliters Per Day"; 1 MLd = 0.011574074074074 cubic-meters-per-second + # mmd means "mm/day" + return np.array( + [ + "streamflow_MLd", + "streamflow_MLd_inclInfilled", + "streamflow_mmd", + "streamflow_QualityCodes", + ] + ) elif self.region == "BR": return np.array( [ @@ -637,7 +702,7 @@ def read_object_ids(self, **kwargs) -> np.ndarray: """ if self.region in ["BR", "GB", "US"]: return self.sites["gauge_id"].values - elif self.region == "AUS": + elif self.region in ["AUS", "AUS_v2"]: return self.sites["station_id"].values elif self.region == "CL": station_ids = self.sites.columns.values @@ -870,6 +935,30 @@ def read_target_cols( chosen_data = flow_data[gage_id_lst].values[ind1, :] chosen_data[chosen_data < 0] = np.nan y[:, ind2, k] = chosen_data.T + # ML/d-->m3/s + if target_cols[k] == "streamflow_MLd": + y = y / 84.6 + elif self.region == "AUS_v2": + for k in tqdm( + range(len(target_cols)), desc="Read streamflow data of CAMELS-AUS-V2" + ): + flow_data = pd.read_csv( + os.path.join( + self.data_source_description["CAMELS_FLOW_DIR"], + target_cols[k] + ".csv", + ) + ) + df_date = flow_data[["year", "month", "day"]] + date = pd.to_datetime(df_date).values.astype("datetime64[D]") + [c, ind1, ind2] = np.intersect1d( + date, t_range_list, return_indices=True + ) + chosen_data = flow_data[gage_id_lst].values[ind1, :] + chosen_data[chosen_data < 0] = np.nan + y[:, ind2, k] = chosen_data.T + # ML/d-->m3/s + if target_cols[k] == "streamflow_MLd": + y = y / 84.6 elif self.region == "BR": for j in tqdm( range(len(target_cols)), desc="Read streamflow data of CAMELS-BR" @@ -931,10 +1020,8 @@ def read_target_cols( else: raise NotImplementedError(CAMELS_NO_DATASET_ERROR_LOG) # Keep unit of streamflow unified: we use ft3/s here - # unit of flow in AUS is MegaLiter/day -> ft3/s - if self.region == "AUS": - y = y / 86.4 * 35.314666721489 - elif self.region != "US": + # unit of flow in AUS is MegaLiter/day -> m3/s + if self.region != "US": # other units are m3/s -> ft3/s y = y * 35.314666721489 return y @@ -1207,6 +1294,42 @@ def read_relevant_cols( ) chosen_data = forcing_data[gage_id_lst].values[ind1, :] x[:, ind2, k] = chosen_data.T + elif self.region == "AUS_v2": + for k in tqdm(range(len(var_lst)), desc="Read forcing data of CAMELS-AUS-V2"): + if "precipitation_" in var_lst[k]: + forcing_dir = os.path.join( + self.data_source_description["CAMELS_FORCING_DIR"], + "01_precipitation_timeseries", + ) + elif "et_" in var_lst[k] or "evap_" in var_lst[k]: + forcing_dir = os.path.join( + self.data_source_description["CAMELS_FORCING_DIR"], + "02_EvaporativeDemand_timeseries", + ) + elif "_AGCD" in var_lst[k]: + forcing_dir = os.path.join( + self.data_source_description["CAMELS_FORCING_DIR"], + "03_Other", + "AGCD", + ) + elif "_SILO" in var_lst[k]: + forcing_dir = os.path.join( + self.data_source_description["CAMELS_FORCING_DIR"], + "03_Other", + "SILO", + ) + else: + raise NotImplementedError(CAMELS_NO_DATASET_ERROR_LOG) + forcing_data = pd.read_csv( + os.path.join(forcing_dir, var_lst[k] + ".csv") + ) + df_date = forcing_data[["year", "month", "day"]] + date = pd.to_datetime(df_date).values.astype("datetime64[D]") + [c, ind1, ind2] = np.intersect1d( + date, t_range_list, return_indices=True + ) + chosen_data = forcing_data[gage_id_lst].values[ind1, :] + x[:, ind2, k] = chosen_data.T elif self.region == "BR": for j in tqdm(range(len(var_lst)), desc="Read forcing data of CAMELS-BR"): for k in tqdm(range(len(gage_id_lst))): @@ -1313,6 +1436,12 @@ def read_attr_all_in_one_file(self): "CAMELS_AUS_Attributes-Indices_MasterTable.csv", ) all_attr = pd.read_csv(attr_all_file, sep=",") + elif self.region == "AUS_v2": + attr_all_file = os.path.join( + self.data_source_description["CAMELS_DIR"], + "CAMELS_AUS_Attributes&Indices_MasterTable.csv", + ) + all_attr = pd.read_csv(attr_all_file, sep=",") elif self.region == "CL": attr_all_file = os.path.join( self.data_source_description["CAMELS_ATTR_DIR"], @@ -1406,7 +1535,7 @@ def read_constant_cols( """ if self.region in ["BR", "GB", "US"]: attr_all, var_lst_all, var_dict, f_dict = self.read_attr_all() - elif self.region in ["AUS", "CL"]: + elif self.region in ["AUS", "AUS_v2", "CL"]: attr_all, var_lst_all, var_dict, f_dict = self.read_attr_all_in_one_file() else: raise NotImplementedError(CAMELS_NO_DATASET_ERROR_LOG) @@ -1421,7 +1550,7 @@ def read_constant_cols( def read_area(self, gage_id_lst) -> np.ndarray: if self.region == "US": return self.read_attr_xrdataset(gage_id_lst, ["area_gages2"]) - elif self.region == "AUS": + elif self.region in ["AUS", "AUS_v2"]: return self.read_constant_cols( gage_id_lst, ["catchment_area"], is_return_dict=False ) @@ -1430,20 +1559,31 @@ def read_area(self, gage_id_lst) -> np.ndarray: else: raise NotImplementedError(CAMELS_NO_DATASET_ERROR_LOG) - def read_mean_prcp(self, gage_id_lst) -> np.ndarray: - if self.region in ["US", "AUS", "BR", "GB"]: + def read_mean_prcp(self, gage_id_lst, unit="mm/d") -> np.ndarray: + if self.region in ["US", "AUS", "AUS_v2", "BR", "GB"]: if self.region == "US": - return self.read_attr_xrdataset(gage_id_lst, ["p_mean"]) - return self.read_constant_cols( - gage_id_lst, ["p_mean"], is_return_dict=False + data = self.read_attr_xrdataset(gage_id_lst, ["p_mean"]) + data = self.read_constant_cols( + gage_id_lst, ["p_mean"], is_return_dict=False, ) elif self.region == "CL": # there are different p_mean values for different forcings, here we chose p_mean_cr2met now - return self.read_constant_cols( + data = self.read_constant_cols( gage_id_lst, ["p_mean_cr2met"], is_return_dict=False ) else: raise NotImplementedError(CAMELS_NO_DATASET_ERROR_LOG) + if unit in ["mm/d", "mm/day"]: + converted_data = data + elif unit in ["mm/h", "mm/hour"]: + converted_data = data / 24 + elif unit in ["mm/3h", "mm/3hour"]: + converted_data = data / 8 + else: + raise ValueError( + "unit must be one of ['mm/d', 'mm/day', 'mm/h', 'mm/hour', 'mm/3h', 'mm/3hour']" + ) + return converted_data def cache_forcing_np_json(self): """