diff --git a/arch/conftest.py b/arch/conftest.py index d5ebbad541..972b49c713 100644 --- a/arch/conftest.py +++ b/arch/conftest.py @@ -2,6 +2,7 @@ pytest_plugins = [ "arch.tests.unitroot.cointegration_data", + "arch.tests.covariance.covariance_data", ] diff --git a/arch/covariance/kernel.py b/arch/covariance/kernel.py index 43d37ba68b..4085a441ad 100644 --- a/arch/covariance/kernel.py +++ b/arch/covariance/kernel.py @@ -189,6 +189,7 @@ class CovarianceEstimator(ABC): def __init__( self, x: ArrayLike, + *, bandwidth: Optional[float] = None, df_adjust: int = 0, center: bool = True, diff --git a/arch/covariance/var.py b/arch/covariance/var.py index 713c14d153..299bd5b5cb 100644 --- a/arch/covariance/var.py +++ b/arch/covariance/var.py @@ -38,6 +38,7 @@ class PreWhitenRecoloredCovariance(CovarianceEstimator): df_adjust : int, default 0 center : bool, default True weights : array_like, default None + force_int: bool, default False See Also -------- @@ -52,6 +53,7 @@ class PreWhitenRecoloredCovariance(CovarianceEstimator): def __init__( self, x: ArrayLike, + *, lags: Optional[int] = None, method: str = "aic", diagonal: bool = True, @@ -62,9 +64,15 @@ def __init__( df_adjust: int = 0, center: bool = True, weights: Optional[ArrayLike] = None, + force_int: bool = False, ) -> None: super().__init__( - x, bandwidth=bandwidth, df_adjust=df_adjust, center=center, weights=weights + x, + bandwidth=bandwidth, + df_adjust=df_adjust, + center=center, + weights=weights, + force_int=force_int, ) self._kernel_name = kernel self._lags = 0 @@ -294,7 +302,12 @@ def cov(self) -> CovarianceEstimate: resids = var_mod.resids nobs, nvar = resids.shape self._kernel_instance = self._kernel( - resids, self._bandwidth, 0, False, self._x_weights, self._force_int + resids, + bandwidth=self._bandwidth, + df_adjust=0, + center=False, + weights=self._x_weights, + force_int=self._force_int, ) kern_cov = self._kernel_instance.cov short_run = kern_cov.short_run diff --git a/arch/tests/covariance/covariance_data.py b/arch/tests/covariance/covariance_data.py new file mode 100644 index 0000000000..9eb7305b8d --- /dev/null +++ b/arch/tests/covariance/covariance_data.py @@ -0,0 +1,45 @@ +from itertools import product + +import numpy as np +import pandas as pd +import pytest + +DATA_PARAMS = list(product([1, 3], [True, False], [0])) # , 1, 3])) +DATA_IDS = [f"dim: {d}, pandas: {p}, order: {o}" for d, p, o in DATA_PARAMS] + + +@pytest.fixture(scope="module", params=DATA_PARAMS, ids=DATA_IDS) +def covariance_data(request): + dim, pandas, order = request.param + rs = np.random.RandomState([839084, 3823810, 982103, 829108]) + burn = 100 + shape = (burn + 500,) + if dim > 1: + shape += (3,) + rvs = rs.standard_normal(shape) + phi = np.zeros((order, dim, dim)) + if order > 0: + phi[0] = np.eye(dim) * 0.4 + 0.1 + for i in range(1, order): + phi[i] = 0.3 / (i + 1) * np.eye(dim) + for i in range(order, burn + 500): + for j in range(order): + if dim == 1: + rvs[i] += np.squeeze(phi[j] * rvs[i - j - 1]) + else: + rvs[i] += phi[j] @ rvs[i - j - 1] + if order > 1: + p = np.eye(dim * order, dim * order, -dim) + for j in range(order): + p[:dim, j * dim : (j + 1) * dim] = phi[j] + v, _ = np.linalg.eig(p) + assert np.max(np.abs(v)) < 1 + rvs = rvs[burn:] + if pandas and dim == 1: + return pd.Series(rvs, name="x") + elif pandas: + df = pd.DataFrame(rvs, columns=[f"x{i}" for i in range(dim)]) + df.to_csv("cov-data.csv") + return df + + return rvs diff --git a/arch/tests/covariance/test_var.py b/arch/tests/covariance/test_var.py index 6fecae1738..ec616a96e3 100644 --- a/arch/tests/covariance/test_var.py +++ b/arch/tests/covariance/test_var.py @@ -1,4 +1,3 @@ -from itertools import product from typing import Optional, Tuple import numpy as np @@ -9,8 +8,6 @@ from arch.covariance.var import PreWhitenRecoloredCovariance from arch.typing import NDArray -DATA_PARAMS = list(product([1, 3], [True, False], [0])) # , 1, 3])) -DATA_IDS = [f"dim: {d}, pandas: {p}, order: {o}" for d, p, o in DATA_PARAMS] KERNELS = [ "Bartlett", "Parzen", @@ -32,40 +29,6 @@ def kernel(request): return request.param -@pytest.fixture(scope="module", params=DATA_PARAMS, ids=DATA_IDS) -def data(request): - dim, pandas, order = request.param - rs = np.random.RandomState([839084, 3823810, 982103, 829108]) - burn = 100 - shape = (burn + 500,) - if dim > 1: - shape += (3,) - rvs = rs.standard_normal(shape) - phi = np.zeros((order, dim, dim)) - if order > 0: - phi[0] = np.eye(dim) * 0.4 + 0.1 - for i in range(1, order): - phi[i] = 0.3 / (i + 1) * np.eye(dim) - for i in range(order, burn + 500): - for j in range(order): - if dim == 1: - rvs[i] += np.squeeze(phi[j] * rvs[i - j - 1]) - else: - rvs[i] += phi[j] @ rvs[i - j - 1] - if order > 1: - p = np.eye(dim * order, dim * order, -dim) - for j in range(order): - p[:dim, j * dim : (j + 1) * dim] = phi[j] - v, _ = np.linalg.eig(p) - assert np.max(np.abs(v)) < 1 - rvs = rvs[burn:] - if pandas and dim == 1: - return pd.Series(rvs, name="x") - elif pandas: - return pd.DataFrame(rvs, columns=[f"x{i}" for i in range(dim)]) - return rvs - - def direct_var( x, const: bool, full_order: int, diag_order: int, max_order: Optional[int] = None ) -> Tuple[NDArray, NDArray]: @@ -140,21 +103,23 @@ def direct_ic( @pytest.mark.parametrize("diag_order", [3, 5]) @pytest.mark.parametrize("max_order", [None, 10]) @pytest.mark.parametrize("ic", ["aic", "bic", "hqc"]) -def test_direct_var(data, const, full_order, diag_order, max_order, ic): - direct_ic(data, ic, const, full_order, diag_order, max_order) +def test_direct_var(covariance_data, const, full_order, diag_order, max_order, ic): + direct_ic(covariance_data, ic, const, full_order, diag_order, max_order) @pytest.mark.parametrize("center", [True, False]) @pytest.mark.parametrize("diagonal", [True, False]) @pytest.mark.parametrize("method", ["aic", "bic", "hqc"]) -def test_ic(data, center, diagonal, method): +def test_ic(covariance_data, center, diagonal, method): pwrc = PreWhitenRecoloredCovariance( - data, center=center, diagonal=diagonal, method=method, bandwidth=0.0, + covariance_data, center=center, diagonal=diagonal, method=method, bandwidth=0.0, ) cov = pwrc.cov - expected_type = np.ndarray if isinstance(data, np.ndarray) else pd.DataFrame + expected_type = ( + np.ndarray if isinstance(covariance_data, np.ndarray) else pd.DataFrame + ) assert isinstance(cov.short_run, expected_type) - expected_max_lag = int(data.shape[0] ** (1 / 3)) + expected_max_lag = int(covariance_data.shape[0] ** (1 / 3)) assert pwrc._max_lag == expected_max_lag expected_ics = {} for full_order in range(expected_max_lag + 1): @@ -162,7 +127,12 @@ def test_ic(data, center, diagonal, method): for diag_order in range(full_order, diag_limit): key = (full_order, diag_order) expected_ics[key] = direct_ic( - data, method, center, full_order, diag_order, max_order=expected_max_lag + covariance_data, + method, + center, + full_order, + diag_order, + max_order=expected_max_lag, ) assert tuple(sorted(pwrc._ics.keys())) == tuple(sorted(expected_ics.keys())) for key in expected_ics: @@ -175,13 +145,18 @@ def test_ic(data, center, diagonal, method): @pytest.mark.parametrize("diagonal", [True, False]) @pytest.mark.parametrize("method", ["aic", "bic", "hqc"]) @pytest.mark.parametrize("lags", [0, 1, 3]) -def test_short_long_run(data, center, diagonal, method, lags): +def test_short_long_run(covariance_data, center, diagonal, method, lags): pwrc = PreWhitenRecoloredCovariance( - data, center=center, diagonal=diagonal, method=method, lags=lags, bandwidth=0.0, + covariance_data, + center=center, + diagonal=diagonal, + method=method, + lags=lags, + bandwidth=0.0, ) cov = pwrc.cov full_order, diag_order = pwrc._order - params, resids = direct_var(data, center, full_order, diag_order) + params, resids = direct_var(covariance_data, center, full_order, diag_order) nobs, nvar = resids.shape expected_short_run = resids.T @ resids / nobs assert_allclose(cov.short_run, expected_short_run) @@ -195,10 +170,27 @@ def test_short_long_run(data, center, diagonal, method, lags): assert_allclose(cov.long_run, expected_long_run) +@pytest.mark.parametrize("force_int", [True, False]) +def test_pwrc_attributes(covariance_data, force_int): + pwrc = PreWhitenRecoloredCovariance(covariance_data, force_int=force_int) + assert isinstance(pwrc.bandwidth_scale, float) + assert isinstance(pwrc.kernel_const, float) + assert isinstance(pwrc.rate, float) + assert isinstance(pwrc._weights(), np.ndarray) + assert pwrc.force_int == force_int + expected_type = ( + np.ndarray if isinstance(covariance_data, np.ndarray) else pd.DataFrame + ) + assert isinstance(pwrc.cov.short_run, expected_type) + assert isinstance(pwrc.cov.long_run, expected_type) + assert isinstance(pwrc.cov.one_sided, expected_type) + assert isinstance(pwrc.cov.one_sided_strict, expected_type) + + @pytest.mark.parametrize("sample_autocov", [True, False]) -def test_data(data, sample_autocov): +def test_data(covariance_data, sample_autocov): pwrc = PreWhitenRecoloredCovariance( - data, sample_autocov=sample_autocov, bandwidth=0.0 + covariance_data, sample_autocov=sample_autocov, bandwidth=0.0 ) pwrc.cov @@ -217,3 +209,8 @@ def test_pwrc_warnings(): x = np.random.standard_normal((9, 5)) with pytest.warns(RuntimeWarning, match="The maximum number of lags is 0"): PreWhitenRecoloredCovariance(x).cov + + +def test_unknown_kernel(covariance_data): + with pytest.raises(ValueError, match=""): + PreWhitenRecoloredCovariance(covariance_data, kernel="unknown") diff --git a/arch/tests/unitroot/test_dynamic_ols.py b/arch/tests/unitroot/test_dynamic_ols.py index 12c3be76ea..aabfd86ea9 100644 --- a/arch/tests/unitroot/test_dynamic_ols.py +++ b/arch/tests/unitroot/test_dynamic_ols.py @@ -16,7 +16,17 @@ def test_smoke(data, trend, lags, leads, common, max_lag, method): y, x = data if common: leads = lags - mod = DynamicOLS(y, x, trend, lags, leads, common, max_lag, max_lag, method) + mod = DynamicOLS( + y, + x, + trend, + lags=lags, + leads=leads, + common=common, + max_lag=max_lag, + max_lead=max_lag, + method=method, + ) mod.fit() @@ -27,8 +37,14 @@ def test_smoke(data, trend, lags, leads, common, max_lag, method): @pytest.mark.parametrize("df_adjust", [True, False]) def test_smoke_fit(data, cov_type, kernel, bandwidth, force_int, df_adjust): y, x = data - mod = DynamicOLS(y, x, "ct", 3, 5, False) - res = mod.fit(cov_type, kernel, bandwidth, force_int, df_adjust) + mod = DynamicOLS(y, x, "ct", lags=3, leads=5, common=False) + res = mod.fit( + cov_type, + kernel=kernel, + bandwidth=bandwidth, + force_int=force_int, + df_adjust=df_adjust, + ) assert isinstance(res.leads, int) assert isinstance(res.lags, int) assert isinstance(res.bandwidth, (int, float)) @@ -44,7 +60,7 @@ def test_smoke_fit(data, cov_type, kernel, bandwidth, force_int, df_adjust): def test_mismatch_lead_lag(data): y, x = data with pytest.raises(ValueError, match="common is specified but leads"): - DynamicOLS(y, x, "c", 4, 5, True) + DynamicOLS(y, x, "c", lags=4, leads=5, common=True) with pytest.raises(ValueError, match="common is specified but max_lead"): DynamicOLS(y, x, max_lag=6, max_lead=7, common=True) diff --git a/arch/tests/unitroot/test_fmols_ccr.py b/arch/tests/unitroot/test_fmols_ccr.py index b22bd1c583..e9ed034383 100644 --- a/arch/tests/unitroot/test_fmols_ccr.py +++ b/arch/tests/unitroot/test_fmols_ccr.py @@ -20,8 +20,8 @@ def test_fmols_smoke( y, x = trivariate_data if x_trend is not None and len(x_trend) < len(trend): x_trend = trend - mod = FullyModifiedOLS(y, x, trend, x_trend) - res = mod.fit(kernel, bandwidth, force_int, diff) + mod = FullyModifiedOLS(y, x, trend, x_trend=x_trend) + res = mod.fit(kernel=kernel, bandwidth=bandwidth, force_int=force_int, diff=diff) assert isinstance(res.summary(), Summary) @@ -35,8 +35,8 @@ def test_ccr_smoke(trivariate_data, trend, x_trend, diff, kernel, bandwidth, for y, x = trivariate_data if x_trend is not None and len(x_trend) < len(trend): x_trend = trend - mod = CanonicalCointegratingReg(y, x, trend, x_trend) - res = mod.fit(kernel, bandwidth, force_int, diff) + mod = CanonicalCointegratingReg(y, x, trend, x_trend=x_trend) + res = mod.fit(kernel=kernel, bandwidth=bandwidth, force_int=force_int, diff=diff) assert isinstance(res.summary(), Summary) @@ -189,7 +189,7 @@ def test_fmols_eviews(trivariate_data, test_key): trend, x_trend, diff = test_key key = (trend, x_trend, diff) test_res = setup_test_values(FMOLS_RES[key]) - mod = FullyModifiedOLS(y, x, trend, x_trend) + mod = FullyModifiedOLS(y, x, trend, x_trend=x_trend) # BW is one less than what Eviews reports res = mod.fit(bandwidth=6, force_int=True, diff=diff, df_adjust=True) if trend != "ctt": @@ -322,7 +322,7 @@ def test_ccr_eviews(trivariate_data, test_key): trend, x_trend, diff = test_key key = (trend, x_trend, diff) test_res = setup_test_values(CCR_RES[key]) - mod = CanonicalCointegratingReg(y, x, trend, x_trend) + mod = CanonicalCointegratingReg(y, x, trend, x_trend=x_trend) # BW is one less than what Eviews reports res = mod.fit(bandwidth=6, force_int=True, diff=diff, df_adjust=True) if trend == "n": diff --git a/arch/tests/unitroot/test_phillips_ouliaris.py b/arch/tests/unitroot/test_phillips_ouliaris.py index b0323dc6af..588e4f21a5 100644 --- a/arch/tests/unitroot/test_phillips_ouliaris.py +++ b/arch/tests/unitroot/test_phillips_ouliaris.py @@ -2,6 +2,7 @@ import numpy as np from numpy.testing import assert_allclose +import pandas as pd import pytest from statsmodels.iolib.summary import Summary @@ -165,3 +166,65 @@ def test_auto_bandwidth(trivariate_data): assert int(res.bandwidth) != res.bandwidth res = phillips_ouliaris(y, x, force_int=True) assert int(res.bandwidth) == res.bandwidth + + +REFERENCE = ( + ( + "n", + [ + [-4.017712, 0.0062, -31.26709, 0.0079], + [-4.207778, 0.0033, -33.83294, 0.0046], + [-4.218211, 0.0031, -33.87041, 0.0045], + ], + ), + ( + "c", + [ + [-29.56978, 0.0000, -1210.576, 0.0001], + [-32.09233, 0.0000, -976.0578, 0.0001], + [-31.93710, 0.0000, -986.2292, 0.0001], + ], + ), + ( + "ct", + [ + [-27.55381, 0.0001, -1179.019, 0.0001], + [-32.22650, 0.0001, -967.9824, 0.0001], + [-31.75132, 0.0001, -999.3838, 0.0001], + ], + ), + ( + "ctt", + [ + [-27.61660, 0.0000, -1180.209, 0.0001], + [-32.50041, 0.0000, -950.8720, 0.0001], + [-31.94475, 0.0000, -983.4933, 0.0001], + ], + ), +) + + +@pytest.mark.parametrize("result", REFERENCE) +def test_against_ref(trivariate_data, result): + trend = result[0] + ref = result[1] + for i in range(3): + ref_row = ref[i] + x_idx = list(np.arange(3)) + x_idx.pop(i) + y_idx = [i] + if isinstance(trivariate_data[0], pd.DataFrame): + z = pd.concat(trivariate_data, axis=1) + x = z.iloc[:, x_idx] + y = z.iloc[:, y_idx] + else: + z = np.column_stack(trivariate_data) + x = z[:, x_idx] + y = z[:, y_idx] + zt = phillips_ouliaris(y, x, trend=trend, test_type="Zt", bandwidth=9) + za = phillips_ouliaris(y, x, trend=trend, test_type="Za", bandwidth=9) + scale = y.shape[0] / (y.shape[0] - 1) + assert_allclose(zt.stat, np.sqrt(scale) * ref_row[0], rtol=1e-4) + assert_allclose(zt.pvalue, ref_row[1], atol=1e-3) + assert_allclose(za.stat, scale * ref_row[2], rtol=1e-4) + assert_allclose(za.pvalue, ref_row[3], atol=1e-3) diff --git a/arch/unitroot/cointegration.py b/arch/unitroot/cointegration.py index c4ca827473..1939302b43 100644 --- a/arch/unitroot/cointegration.py +++ b/arch/unitroot/cointegration.py @@ -523,6 +523,7 @@ def __init__( y: ArrayLike1D, x: ArrayLike2D, trend: str = "c", + *, lags: Optional[int] = None, leads: Optional[int] = None, common: bool = False, @@ -671,6 +672,7 @@ def _leads_and_lags(self) -> Tuple[int, int]: def fit( self, cov_type: str = "unadjusted", + *, kernel: str = "bartlett", bandwidth: Optional[int] = None, force_int: bool = False, @@ -788,12 +790,16 @@ def _cov( kernel_est = KERNEL_ESTIMATORS[kernel] scale = nobs / (nobs - nx) if df_adjust else 1.0 if cov_type in ("unadjusted", "homoskedastic"): - est = kernel_est(eps, bandwidth, center=False, force_int=force_int) + est = kernel_est( + eps, bandwidth=bandwidth, center=False, force_int=force_int + ) sigma2 = np.squeeze(est.cov.long_run) cov = (scale * sigma2) * sigma_xx_inv / nobs elif cov_type in ("robust", "kernel"): scores = x * eps - est = kernel_est(scores, bandwidth, center=False, force_int=force_int) + est = kernel_est( + scores, bandwidth=bandwidth, center=False, force_int=force_int + ) s = est.cov.long_run cov = scale * sigma_xx_inv @ s @ sigma_xx_inv / nobs else: @@ -969,6 +975,7 @@ def __init__( y: ArrayLike1D, x: ArrayLike2D, trend: str = "c", + *, x_trend: Optional[str] = None, ) -> None: setup = _check_cointegrating_regression(y, x, trend) @@ -1025,6 +1032,7 @@ def _final_statistics(self, theta: pd.Series) -> Tuple[pd.Series, float, float]: def fit( self, + *, kernel: str = "bartlett", bandwidth: Optional[float] = None, force_int: bool = True, @@ -1122,13 +1130,15 @@ def __init__( y: ArrayLike1D, x: ArrayLike2D, trend: str = "c", + *, x_trend: Optional[str] = None, ) -> None: - super().__init__(y, x, trend, x_trend) + super().__init__(y, x, trend, x_trend=x_trend) @Appender(FullyModifiedOLS.fit.__doc__) def fit( self, + *, kernel: str = "bartlett", bandwidth: Optional[float] = None, force_int: bool = True, diff --git a/arch/unitroot/critical_values/simulation/engle_granger_simulation.py b/arch/unitroot/critical_values/simulation/engle_granger_simulation.py index 56307c77d8..26e3f5210a 100644 --- a/arch/unitroot/critical_values/simulation/engle_granger_simulation.py +++ b/arch/unitroot/critical_values/simulation/engle_granger_simulation.py @@ -11,10 +11,11 @@ from typing import List import colorama -from joblib import Parallel, cpu_count, delayed import numpy as np from numpy.random import PCG64, Generator, SeedSequence +from joblib import Parallel, cpu_count, delayed + ROOT = os.path.join(os.path.split(os.path.abspath(__file__))[0], "engle-granger") if not os.path.exists(ROOT): os.mkdir(ROOT) diff --git a/arch/unitroot/critical_values/simulation/phillips-ouliaris-simulation.py b/arch/unitroot/critical_values/simulation/phillips-ouliaris-simulation.py index 19108a617c..de65743339 100644 --- a/arch/unitroot/critical_values/simulation/phillips-ouliaris-simulation.py +++ b/arch/unitroot/critical_values/simulation/phillips-ouliaris-simulation.py @@ -6,15 +6,15 @@ from typing import Optional import colorama -from joblib import Parallel, delayed import numpy as np from numpy.linalg import inv, lstsq, solve import pandas as pd -import psutil from arch.typing import NDArray from arch.utility.timeseries import add_trend +from joblib import Parallel, delayed from phillips_ouliaris import QUANTILES, ROOT, SAMPLE_SIZES, TRENDS +import psutil GREEN = colorama.Fore.GREEN BLUE = colorama.Fore.BLUE