Skip to content

Commit

Permalink
update cross_valid_data and split_train_test funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
OuyangWenyu committed Mar 25, 2024
1 parent a3791dd commit 535af52
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 109 deletions.
195 changes: 87 additions & 108 deletions hydromodel/datasets/data_preprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Author: Wenyu Ouyang
Date: 2022-10-25 21:16:22
LastEditTime: 2024-03-25 17:19:38
LastEditTime: 2024-03-25 19:54:15
LastEditors: Wenyu Ouyang
Description: preprocess data for models in hydro-model-xaj
FilePath: \hydro-model-xaj\hydromodel\datasets\data_preprocess.py
Expand Down Expand Up @@ -278,16 +278,14 @@ def process_and_save_data_as_nc(
return True


def split_train_test(json_file, npy_file, train_period, test_period):
def split_train_test(ts_file, train_period, test_period):
"""
Split all data to train and test parts with same format
Parameters
----------
json_file
dict file of all data
npy_file
numpy file of all data
ts_file
nc file of all time series data
train_period
training period
test_period
Expand All @@ -297,116 +295,97 @@ def split_train_test(json_file, npy_file, train_period, test_period):
-------
None
"""
data = hydro_file.unserialize_numpy(npy_file)
data_info = hydro_file.unserialize_json(json_file)
date_lst = pd.to_datetime(data_info["time"]).values.astype("datetime64[D]")
t_range_train = hydro_time.t_range_days(train_period)
t_range_test = hydro_time.t_range_days(test_period)
_, ind1, ind2 = np.intersect1d(date_lst, t_range_train, return_indices=True)
_, ind3, ind4 = np.intersect1d(date_lst, t_range_test, return_indices=True)
data_info_train = OrderedDict(
{
"time": [str(t)[:10] for t in hydro_time.t_range_days(train_period)],
# TODO: for time, more detailed time is needed, so we need to change the format of time
# "time": [str(t)[:16] for t in hydro_time.t_range_days(train_period)],
"basin": data_info["basin"],
"variable": data_info["variable"],
"area": data_info["area"],
}
ts_data = xr.open_dataset(ts_file)
# Convert date strings to pandas datetime objects
train_start, train_end = pd.to_datetime(train_period[0]), pd.to_datetime(
train_period[1]
)
data_info_test = OrderedDict(
{
"time": [str(t)[:10] for t in hydro_time.t_range_days(test_period)],
# TODO: for time, more detailed time is needed, so we need to change the format of time
# "time": [str(t)[:16] for t in hydro_time.t_range_days(test_period)],
"basin": data_info["basin"],
"variable": data_info["variable"],
"area": data_info["area"],
}
test_start, test_end = pd.to_datetime(test_period[0]), pd.to_datetime(
test_period[1]
)
# unify it with cross validation case, so we add a 'fold0'
train_json_file = json_file.parent.joinpath(json_file.stem + "_fold0_train.json")
train_npy_file = json_file.parent.joinpath(npy_file.stem + "_fold0_train.npy")
hydro_file.serialize_json(data_info_train, train_json_file)
hydro_file.serialize_numpy(data[ind1, :, :], train_npy_file)
test_json_file = json_file.parent.joinpath(json_file.stem + "_fold0_test.json")
test_npy_file = json_file.parent.joinpath(npy_file.stem + "_fold0_test.npy")
hydro_file.serialize_json(data_info_test, test_json_file)
hydro_file.serialize_numpy(data[ind3, :, :], test_npy_file)


def cross_valid_data(json_file, npy_file, period, warmup, cv_fold, time_unit="h"):

# Select data for training and testing periods
train_data = ts_data.sel(time=slice(train_start, train_end))
test_data = ts_data.sel(time=slice(test_start, test_end))

return train_data, test_data


def validate_freq(freq):
"""
Split all data to train and test parts with same format
Validate if the freq string is a valid pandas frequency.
Parameters
----------
json_file
dict file of all data
npy_file
numpy file of all data
period
the whole period
warmup
warmup period length
cv_fold
number of folds
freq : str
Frequency string to validate.
Returns
-------
None
bool
True if the freq string is valid, False otherwise.
"""
data = hydro_file.unserialize_numpy(npy_file)
data_info = hydro_file.unserialize_json(json_file)
date_lst = pd.to_datetime(data_info["time"]).values.astype("datetime64[D]")
date_wo_warmup = date_lst[warmup:]
kf = KFold(n_splits=cv_fold, shuffle=False)
for i, (train, test) in enumerate(kf.split(date_wo_warmup)):
train_period = date_wo_warmup[train]
test_period = date_wo_warmup[test]
train_period_warmup = np.arange(
train_period[0] - np.timedelta64(warmup, time_unit), train_period[0]
)
test_period_warmup = np.arange(
test_period[0] - np.timedelta64(warmup, time_unit), test_period[0]
)
t_range_train = np.concatenate((train_period_warmup, train_period))
t_range_test = np.concatenate((test_period_warmup, test_period))
_, ind1, ind2 = np.intersect1d(date_lst, t_range_train, return_indices=True)
_, ind3, ind4 = np.intersect1d(date_lst, t_range_test, return_indices=True)
data_info_train = OrderedDict(
{
"time": [
np.datetime_as_string(d, unit=time_unit) for d in t_range_train
],
"basin": data_info["basin"],
"variable": data_info["variable"],
"area": data_info["area"],
}
)
data_info_test = OrderedDict(
{
"time": [
np.datetime_as_string(d, unit=time_unit) for d in t_range_test
],
"basin": data_info["basin"],
"variable": data_info["variable"],
"area": data_info["area"],
}
)
train_json_file = json_file.parent.joinpath(
json_file.stem + "_fold" + str(i) + "_train.json"
)
train_npy_file = json_file.parent.joinpath(
npy_file.stem + "_fold" + str(i) + "_train.npy"
)
hydro_file.serialize_json(data_info_train, train_json_file)
hydro_file.serialize_numpy(data[ind1, :, :], train_npy_file)
test_json_file = json_file.parent.joinpath(
json_file.stem + "_fold" + str(i) + "_test.json"
)
test_npy_file = json_file.parent.joinpath(
npy_file.stem + "_fold" + str(i) + "_test.npy"
try:
pd.to_timedelta("1" + freq)
return True
except ValueError:
return False


def cross_valid_data(ts_file, period, warmup, cv_fold, freq="1D"):
"""
Split all data to train and test parts with same format for cross validation.
Parameters
----------
ts_file : str
Path to the NetCDF file of time series data.
period : tuple of str
The whole period in the format ("start_date", "end_date").
warmup : int
Warmup period length in days.
cv_fold : int
Number of folds for cross-validation.
freq : str
len of one period.
Returns
-------
list of tuples
Each tuple contains training and testing datasets for a fold.
"""
if not validate_freq(freq):
raise ValueError(
"Time unit must be number with either 'Y','M','W','D','h','m' or 's', such as 3D."
)
hydro_file.serialize_json(data_info_test, test_json_file)
hydro_file.serialize_numpy(data[ind3, :, :], test_npy_file)
ts_data = xr.open_dataset(ts_file)

# Convert the whole period to pandas datetime
start_date, end_date = pd.to_datetime(period[0]), pd.to_datetime(period[1])
date_lst = pd.date_range(start=start_date, end=end_date, freq=freq)
date_rm_warmup = date_lst[warmup:]

# Initialize lists to store train and test datasets for each fold
train_test_data = []

# KFold split
kf = KFold(n_splits=cv_fold, shuffle=False)
for train_index, test_index in kf.split(date_rm_warmup):
train_period = date_rm_warmup[train_index]
test_period = date_rm_warmup[test_index]
# Create warmup periods using the specified frequency
train_period_warmup = pd.date_range(
end=train_period[0], periods=warmup + 1, freq=freq
)[:-1]
test_period_warmup = pd.date_range(
end=test_period[0], periods=warmup + 1, freq=freq
)[:-1]

# Select data from ts_data based on train and test periods
train_data = ts_data.sel(time=train_period.union(train_period_warmup))
test_data = ts_data.sel(time=test_period.union(test_period_warmup))

# Add the datasets to the list
train_test_data.append((train_data, test_data))

return train_test_data
55 changes: 54 additions & 1 deletion test/test_data_preprocess.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
from hydrodataset import Camels
import numpy as np
import pytest
import os
import pandas as pd
import xarray as xr
from sklearn.model_selection import KFold

from hydromodel import SETTING
from hydromodel.datasets import *
from hydromodel.datasets.data_preprocess import process_and_save_data_as_nc
from hydromodel.datasets.data_preprocess import (
process_and_save_data_as_nc,
split_train_test,
)
from hydromodel.datasets.data_preprocess import check_tsdata_format
from hydromodel.datasets.data_preprocess import check_basin_attr_format
from hydromodel.datasets.data_preprocess import check_folder_contents
from hydromodel.datasets.data_preprocess import cross_valid_data


@pytest.fixture()
Expand Down Expand Up @@ -253,3 +259,50 @@ def test_load_dataset():
["01013500"], ["2010-01-01", "2014-01-01"], ["streamflow"]
)
print(data)


def create_temp_netCDF(tmp_path, periods=10):
"""temp NetCDF file for test"""
ts_file = tmp_path / "time_series.nc"
basins = ["basin1", "basin2", "basin3"]
data = xr.Dataset(
{
"flow": (("time", "basin"), np.random.rand(periods, 3)),
"prcp": (("time", "basin"), np.random.rand(periods, 3)),
},
coords={
"time": pd.date_range(start="2022-01-01", periods=periods),
"basin": basins,
},
)
data.to_netcdf(ts_file)
return str(ts_file)


@pytest.fixture
def ts_file_fixture(tmp_path):
return create_temp_netCDF(tmp_path)


def test_cross_valid_data(ts_file_fixture):
period = ("2022-01-01", "2022-01-10")
warmup = 3
cv_fold = 3

train_test_data = cross_valid_data(ts_file_fixture, period, warmup, cv_fold)

assert len(train_test_data) == cv_fold


def test_split_train_test(ts_file_fixture):
# Define the train and test periods
train_period = ("2022-01-01", "2022-01-05")
test_period = ("2022-01-06", "2022-01-10")

# Call the function to split the data
train_data, test_data = split_train_test(ts_file_fixture, train_period, test_period)

# Assert that the train and test data have the correct length and shape
basins = ["basin1", "basin2", "basin3"]
assert len(train_data.time) == 5 and train_data.flow.shape == (5, len(basins))
assert len(test_data.time) == 5 and test_data.flow.shape == (5, len(basins))

0 comments on commit 535af52

Please sign in to comment.