diff --git a/fypy/pricing/fourier/GilPeleazEuropeanPricer.py b/fypy/pricing/fourier/GilPeleazEuropeanPricer.py index 4904e90..40e1c6f 100644 --- a/fypy/pricing/fourier/GilPeleazEuropeanPricer.py +++ b/fypy/pricing/fourier/GilPeleazEuropeanPricer.py @@ -42,4 +42,4 @@ def price(self, T: float, K: float, is_call: bool): if not is_call: price -= sf - K * disc - return price + return price \ No newline at end of file diff --git a/fypy/pricing/fourier/ProjAsianPricer.py b/fypy/pricing/fourier/ProjAsianPricer.py index d02c300..51ab231 100644 --- a/fypy/pricing/fourier/ProjAsianPricer.py +++ b/fypy/pricing/fourier/ProjAsianPricer.py @@ -461,4 +461,4 @@ def _price_update(self, option_params: np.ndarray, grid_params: np.ndarray, beta Val = Val + option_params['C'].item() * np.exp(-r * T) * mult - K[index] * np.exp(-r * T) - output[index] = max(0, Val) + output[index] = max(0, Val) \ No newline at end of file diff --git a/fypy/pricing/fourier/StochVol/ProjAmericanRichardson.py b/fypy/pricing/fourier/StochVol/ProjAmericanRichardson.py new file mode 100644 index 0000000..dd6ebec --- /dev/null +++ b/fypy/pricing/fourier/StochVol/ProjAmericanRichardson.py @@ -0,0 +1,16 @@ +import numpy as np + + +from fypy.model.FourierModel import FourierModel +from fypy.pricing.fourier.StochVol.ProjBermudanPricer_SV import ProjBermudanPricer_SV + + +class ProjAmericanRichardson: + def __init__(self, model: FourierModel, N: int = 2**11): + self._bermudan_pricer = ProjBermudanPricer_SV(model, N) + return + + def price(self, T: float, W: int, S0: float, M: int, is_call: bool) -> float: + price2M = self._bermudan_pricer.price(T, W, S0, 2 * M, is_call) + priceM = self._bermudan_pricer.price(T, W, S0, M, is_call) + return 2 * price2M - priceM diff --git a/fypy/pricing/fourier/StochVol/ProjAsianPricer_SV.py b/fypy/pricing/fourier/StochVol/ProjAsianPricer_SV.py new file mode 100644 index 0000000..9073ed8 --- /dev/null +++ b/fypy/pricing/fourier/StochVol/ProjAsianPricer_SV.py @@ -0,0 +1,344 @@ +import numpy as np + +# np.set_printoptions(precision=4) + +from fypy.model.FourierModel import FourierModel +from fypy.termstructures.EquityForward import EquityForward + + +from scipy.io import loadmat as loadmat +from fypy.pricing.fourier.StochVol.StochVolParams import ( + GridParamsGeneric, + NumericalParams, + ExponentialMat, + TYPES, +) + +from fypy.pricing.fourier.StochVol.StochVolPricer import RecursiveReturnPricer + + +class GridParams(GridParamsGeneric): + def __init__( + self, + W: float, + S0: float, + N: int, + alpha: float, + T: float, + M: int, + P: int, + Pbar: int, + ): + super().__init__(W, S0, N, alpha, T, M) + self.init_variables(P=P, Pbar=Pbar) + + def init_variables(self, *, P: int, Pbar: int, **kwargs): + self.P = P + self.Pbar = Pbar + self.a = 2**P + self.C_an = 1 + self.dx = 1 / self.a + self.dxi = self._get_dxi() + self.xi = self._get_xi() + self.hlocalCF = lambda x: np.log(1 + np.exp(x)) + self.upsilon = (32 * self.a**4) / self.N + return + + def conditional_variables(self): + return + + def update_and_create_variables(self): + return + + +class RecursivePrice(RecursiveReturnPricer): + def __init__( + self, + model: FourierModel, + mat: ExponentialMat, + grid: GridParamsGeneric, + num_params: NumericalParams, + ): + super().__init__(model=model, exp_mat=mat, grid=grid, num_params=num_params) + self.transit_mat = self.exp_mat.get_exp_tensor() + + def _set_left_and_NMM(self, **kwargs): + self._get_mum_xm_nm_vec(self.grid) + self.leftGridPoint = self.xm[0] + self.NNM = int(self.grid.N + self.nm_vec[self.grid.M - 2]) + + def _get_PhiY(self): + self.zeta = self._get_zeta_vec(self.grid.a, self.grid.xi) + self.grid.xi = self.grid.xi[1:] + self.chf = np.sum(self.transit_mat, axis=0).T + self._PSI_computation(a=self.grid.a) + self.psi = np.vstack((np.ones(self.psi.shape[1]), self.psi)) + self.psi[-1, :] = 0 + + def _beta_computation(self, xm: int, j: int): + gamma = 1 / (32 * self.grid.a**4) + h = np.zeros(self.grid.N, dtype="complex_") + h[0] = gamma + h[1:] = self.chf[1:, j] * self.zeta * np.exp(-1j * xm * self.grid.xi) + return np.real(np.fft.fft(h)) + + def _recursion(self): + M = self.grid.M + for m in np.arange(2, M + 1): + idx_m = m - 2 + self.phi_z = self._get_phi_z(idx_m) + self._update_chf() + return + + def _get_phi_z(self, idx_m: int): + phi_z = [] + for j in range(self.num_params.m0): + beta = self._beta_computation(self.xm[idx_m], j) + phi_z_j = self.grid.upsilon * ( + self.psi[ + :, + int(self.nm_vec[idx_m]) : (int(self.nm_vec[idx_m]) + self.grid.N), + ] + @ beta + ) + phi_z.append(phi_z_j) + return np.array(phi_z).T + + def _update_chf(self): + for j in range(self.num_params.m0): + self.chf[:, j] = self.phi_z[:, 0] * self.transit_mat[0, j, :] + for k in range(1, self.num_params.m0): + self.chf[:, j] += self.phi_z[:, k] * self.transit_mat[k, j, :] + + # µm, nm, xm vectors (equations 42 and 43) + def _get_mum_xm_nm_vec(self, grid: GridParamsGeneric): + dx = self.grid.dx + theta = grid.dt * self._model.forwardCurve.drift(0, grid.T) + self._get_nm_vec(theta=theta) + self.nm_vec = np.hstack((0, self.nm_vec)) + self.mu_vec = theta + self.nm_vec * dx + self.xm = self.mu_vec + (1 - 0.5 * self.grid.N) * dx + + def _get_nm_vec(self, theta: float): + m_vec = np.arange(2, self.grid.M + 1) + # if theta = 0, do tayor development + match bool(np.abs(theta) >= 1e-8): + case True: + self.nm_vec = np.floor( + self.grid.a + * np.log((np.exp(m_vec * theta) - 1) / (np.exp(theta) - 1)) + ).astype(int) + case False: + self.nm_vec = np.floor(self.grid.a * np.log(m_vec)).astype(int) + + +class ProjAsianPricer_SV: + def __init__(self, model: FourierModel, P: int = 2**11, Pbar: int = 2**5): + self._init_constants(P, Pbar) + # because jump chf is not implemented + self.model = model + + #################################################### + ######## 3 steps: Init, Recursion, Valuation ####### + #################################################### + + # Main funtion: pricing method + def price(self, T: float, W: int, S0: float, M: int, is_call: bool) -> float: + self._initialization(T, W, S0, M, is_call) + self._recursion() + price = self._valuation_stage() + return price + + def _initialization(self, T: float, W: int, S0: float, M: int, is_call: bool): + self._init_classes(T, W, S0, M, is_call) + self._init_tensors() + return + + def _init_tensors(self): + self._get_v_coef() + self.j0 = self.rec_pricer._get_k0() + self.zeta = self.rec_pricer._get_zeta_vec(self.grid.a, self.grid.xi) + self.transit_mat = self.exp_mat.get_exp_tensor() + self.chf = self._init_chf() + + def _init_classes(self, T: float, W: int, S0: float, M: int, is_call: bool): + self._check_call_put(is_call=is_call) + self.num_params = NumericalParams() + self.grid = GridParams( + W, S0, self.N, self.num_params.alpha, T, M, self.P, self.Pbar + ) + self.exp_mat = ExponentialMat(self.grid, self.model, self.num_params, T) + self.rec_pricer = RecursivePrice( + self.model, self.exp_mat, self.grid, self.num_params + ) + return + + def _recursion(self): + self.rec_pricer._get_PhiY() + self.rec_pricer._recursion() + + def _valuation_stage(self): + self._set_phi_y_phi_z() + self._set_valuation_constants() + pc_price = self._valuation_sub() + return pc_price + + def _valuation_sub(self): + val1, val2 = self._get_vals() + val = self._interp(val1, val2) + pc_price = self._pc_price(val) + return pc_price + + def _set_phi_y_phi_z(self): + self.chf = self.rec_pricer.chf + self.phi_z = self.rec_pricer.phi_z + + def _set_valuation_constants(self): + y_star = np.log(((self.grid.M + 1) * self.grid.W / self.grid.S0) - 1) + self.k_star = self._get_k_star(self.grid.dx, y_star) + self.gk = self._get_g_coef( + self.k_star, y_star, self.grid.S0, self.grid.W, self.grid.M + ) + self.xM = y_star - (self.k_star - 1) * self.grid.dx + + def _get_k_star(self, dx, y_star): + return int( + np.floor( + 1 + + self.grid.a + * (y_star - (self.rec_pricer.mu_vec[-1] + (1 - self.grid.N / 2) * dx)) + ) + ) + + def _pc_price(self, val: float) -> float: + C = self.grid.S0 / (self.grid.M + 1) + r = -np.log(self.model.discountCurve.discount_T(self.grid.T)) / self.grid.T + if isinstance(self.model.forwardCurve, EquityForward): + q = -np.log(self.model.forwardCurve._divDiscount(self.grid.T)) / self.grid.T + else: + raise ValueError( + "Forward curve of the model should be an equity forward to have access to dividend yield." + ) + T = self.grid.T + M = self.grid.M + pc_price = float( + val + + C + * np.exp(-r * T) + * (np.exp((r - q) * T * (1 + 1 / M)) - 1) + / (np.exp((r - q) * self.grid.dt) - 1) + - self.grid.W * np.exp(-r * T) + ) + return pc_price + + def _interp(self, val1: float, val2: float): + if isinstance(self.model, TYPES.Hes_base): + val = val1 + (val2 - val1) * ( + self.model.v_0 - self.exp_mat.get_v()[self.j0 - 1] + ) / (self.exp_mat.get_v()[self.j0] - self.exp_mat.get_v()[self.j0 - 1]) + return val + else: + raise NotImplementedError("Only Heston based models are implemented.") + + def _get_vals(self): + disc = self.model.discountCurve.discount_T(self.grid.T) + val1 = self._get_val(self.j0 - 1) + val2 = self._get_val(self.j0) + return disc * val1, disc * val2 + + def _get_val(self, k: int): + beta = self.rec_pricer._beta_computation(self.xM, k) + val2 = self.grid.upsilon * (beta[: self.k_star + 1] * self.gk).sum() + return val2 + + #################################################### + ####### INITIALIZATION auxiliary functions ######## + #################################################### + + # pricing parameters input + def _init_constants(self, P: int, Pbar: int): + self.P = P + self.Pbar = Pbar + self.a = 2**P # not the same as in the grid + self.abar = 2**Pbar + self.N = 2 ** (P + Pbar) + + # get v and vbar coefficients as defined in Table 15 appendix A + def _get_v_coef(self): + dx = self.grid.dx + self.v1 = (1 / 20) * ( + np.exp(-7 / 4 * dx) / 54 + + np.exp(-1.5 * dx) / 18 + + np.exp(-1.25 * dx) / 2 + + 7 * np.exp(-dx) / 27 + ) + + self.v0 = (1 / 20) * ( + 28 / 27 + + np.exp(-7 / 4 * dx) / 54 + + np.exp(-1.5 * dx) / 18 + + np.exp(-1.25 * dx) / 2 + + 14 * np.exp(-dx) / 27 + + 121 / 54 * np.exp(-0.75 * dx) + + 23 / 18 * np.exp(-0.5 * dx) + + 235 / 54 * np.exp(-0.25 * dx) + ) + + self.vm1 = (1 / 90) * ( + (28 + 7 * np.exp(-dx)) / 3 + + ( + 14 * np.exp(dx) + + np.exp(-7 / 4 * dx) + + 242 * np.cosh(0.75 * dx) + + 470 * np.cosh(0.25 * dx) + ) + / 12 + + 0.25 + * (np.exp(-1.5 * dx) + 9 * np.exp(-1.25 * dx) + 46 * np.cosh(0.5 * dx)) + ) + + self.vstar = (1 / 90) * ( + 14 / 3 * (2 + np.cosh(dx)) + + 0.5 + * (np.cosh(1.5 * dx) + 9 * np.cosh(1.25 * dx) + 23 * np.cosh(0.5 * dx)) + + 1 + / 6 + * ( + np.cosh(7 / 4 * dx) + + 121 * np.cosh(0.75 * dx) + + 235 * np.cosh(0.25 * dx) + ) + ) + self.vbar_star = 1 + self.vbar_m1 = 23 / 24 + self.vbar_0 = 1 / 2 + self.vbar_1 = 1 / 24 + return + + # characteristic function phi_Y initialized + def _init_chf(self): + return np.sum(self.transit_mat, axis=0).T + + def _beta_computation(self): + raise NotImplementedError + + def _get_g_coef(self, k_star: int, y_star: float, S0: float, K: float, M: int): + C = S0 / (M + 1) + Ek = np.exp(y_star + (np.arange(1, k_star + 2) - k_star) * self.grid.dx) + D = K - C + # equation (47) + gk = np.zeros(k_star + 1) + gk[: k_star - 2] = self.vbar_star * D - C * self.vstar * Ek[: k_star - 2] + gk[k_star - 2] = self.vbar_m1 * D - C * self.vm1 * Ek[k_star - 2] + gk[k_star - 1] = self.vbar_0 * D - C * self.v0 * Ek[k_star - 1] + gk[-1] = self.vbar_1 * D - C * self.v1 * Ek[-1] + return gk + + # check that contract asked is implemented (NOTE: to move in a proper class ?) + def _check_call_put(self, is_call: bool): + match is_call: + case False: + raise NotImplementedError("Only put pricing is implemented so far.") + case True: + pass + return diff --git a/fypy/pricing/fourier/StochVol/ProjBarrierPricer_SV.py b/fypy/pricing/fourier/StochVol/ProjBarrierPricer_SV.py new file mode 100644 index 0000000..135f783 --- /dev/null +++ b/fypy/pricing/fourier/StochVol/ProjBarrierPricer_SV.py @@ -0,0 +1,249 @@ +import numpy as np + + +from fypy.model.FourierModel import FourierModel + + +from fypy.pricing.fourier.StochVol.StochVolParams import ( + GridParamsGeneric, + NumericalParams, + ExponentialMat, + Toeplitz, + PayoffConstants, +) + + +from fypy.pricing.fourier.StochVol.StochVolPricer import NonRecursivePriceGeneric + + +class GridParams(GridParamsGeneric): + def __init__( + self, + W: float, + S0: float, + N: int, + H: int, + alpha: float, + T: float, + M: int, + down: bool, + ): + super().__init__(W=W, S0=S0, N=N, alpha=alpha, T=T, M=M, H=H, down=down) + self.init_variables() + self.conditional_variables() + self.update_and_create_variables() + return + + def init_variables(self, **kwargs): + self.dx = self._get_dx() + self.a = self._get_a() + self.dxtil = self._get_dxtil() + self.K = self._get_K() + self.nnot = self._get_nnot() + self.xmin = self._get_xmin() + self.nbar = self._get_nbar() + self.pf_cons = PayoffConstants(self.dx, self.gauss_quad.b4, self.gauss_quad.b3) + return + + def conditional_variables(self): + if self.H is not None: + if self.down: + self.xmin = np.log(self.H / self.S0) + self.nnot = int(1 - self.xmin * self.a) + self.dx = self.xmin / (1 - self.nnot) + self.a = 1 / self.dx + else: + u = np.log(self.H / self.S0) + self.lws = np.log(self.W / self.S0) + self.nnot = int(self.K - self.a * u) + self.dx = u / (self.K - self.nnot) + self.a = 1 / self.dx + self.xmin = u - (self.K - 1) * self.dx + else: + raise ValueError("H should be initialized") + return + + def update_and_create_variables(self): + self.nbar = self._get_nbar() + self.rho = self._get_rho() + self.zeta = self._get_zeta() + self.dxi = self._get_dxi() + self.xi = self._get_xi() + self.gs = self._get_gs() + self.thetM = self._get_thetM() + return + + +class RecursivePrice(NonRecursivePriceGeneric): + def __init__( + self, + mat: ExponentialMat, + toep: Toeplitz, + grid: GridParamsGeneric, + num_params: NumericalParams, + ): + super().__init__(mat=mat, toep=toep, grid=grid, num_params=num_params) + + def _get_val(self, k0: int): + val1 = self.cont[self.grid.nnot - 1, k0 - 1] + val2 = self.cont[self.grid.nnot - 1, k0] + return val1, val2 + + def _init_variables(self): + self._thet = self._init_thet() + self.cont = self._create_cont() + return + + def _init_thet(self): + self._init_thet_constants() + thet = np.zeros((self.grid.K, self.num_params.m0)) + thet[self.grid.nbar - 1, 0] = self._init_thet_sub1() + thet[self.grid.nbar, 0] = self._init_thet_sub2() + thet[self.grid.nbar + 1 : self.grid.K, 0] = self._init_thet_sub3() + return thet + + def _init_thet_sub3(self): + return ( + np.exp( + self.grid.xmin + + self.grid.dx * np.arange(self.grid.nbar + 1, self.grid.K) + ) + * self.grid.S0 + * self.grid.pf_cons.varthet_star + - self.grid.W + ) + + def _init_thet_sub2(self): + return self.grid.W * ( + np.exp(self.grid.dx - self.grid.rho) + * (self.grid.pf_cons.varthet_01 + self.d1) + - (0.5 + self.dbar1) + ) + + def _init_thet_sub1(self): + return self.grid.W * (np.exp(-self.grid.rho) * self.d0 - self.dbar0) + + def _init_thet_constants(self): + self._init_sigma() + self._init_es() + self._init_dbar() + self._init_d() + + def _init_d(self): + self.d0 = self._get_d0() + self.d1 = self._get_d1() + + def _init_dbar(self): + self.dbar0 = 0.5 + self.grid.zeta * (0.5 * self.grid.zeta - 1) + self.dbar1 = self.sigma * (1 - 0.5 * self.sigma) + + def _init_es(self): + self.es1 = np.exp(self.grid.dx * self.sigma_plus) + self.es2 = np.exp(self.grid.dx * self.sigma_minus) + + def _init_sigma(self): + self.sigma = 1 - self.grid.zeta + self.sigma_plus = (self.grid.gauss_quad.q_plus - 0.5) * self.sigma + self.sigma_minus = (self.grid.gauss_quad.q_minus - 0.5) * self.sigma + + def _get_d1(self): + return ( + np.exp((self.grid.rho - self.grid.dx) * 0.5) + * self.sigma + / 18 + * ( + 5 + * ( + (0.5 * (self.grid.zeta + 1) + self.sigma_minus) * self.es2 + + (0.5 * (self.grid.zeta + 1) + self.sigma_plus) * self.es1 + ) + + 4 * (self.grid.zeta + 1) + ) + ) + + def _get_d0(self): + return ( + np.exp((self.grid.rho + self.grid.dx) * 0.5) + * self.sigma**2 + / 18 + * ( + 5 + * ( + (1 - self.grid.gauss_quad.q_minus) * self.es2 + + (1 - self.grid.gauss_quad.q_plus) * self.es1 + ) + + 4 + ) + ) + + def _update_thet(self, j: int) -> None: + self._thet[0, j] = self._update_thet_sub1(j) + self._thet[-1, j] = self._update_thet_sub2(j) + self._thet[1 : self.grid.K - 1, j] = self._update_thet_sub3(j) + return + + def _update_thet_sub3(self, j): + return ( + self.cont[0 : self.grid.K - 2, j] + + 10 * self.cont[1 : self.grid.K - 1, j] + + self.cont[2 : self.grid.K, j] + ) / 12 + + def _update_thet_sub2(self, j): + return ( + 13 * self.cont[-1, j] + + 15 * self.cont[-2, j] + - 5 * self.cont[-3, j] + + self.cont[-4, j] + ) / 48 + + def _update_thet_sub1(self, j): + return ( + 13 * self.cont[0, j] + + 15 * self.cont[1, j] + - 5 * self.cont[2, j] + + self.cont[3, j] + ) / 48 + + def _vec_for_thet_update(self): + return self._thet[: self.grid.K, 0] + + +class ProjBarrierPricer_SV: + def __init__(self, model: FourierModel, N: int = 2**11): + self._init_constants(N) + self.model = model + + def _init_constants(self, N: int): + self.N = N + + # Main funtion: pricing method + def price( + self, T: float, W: int, S0: float, M: int, H: int, down: bool, is_call: bool + ) -> float: + self._check_call_put(is_call=is_call) + self.initialization(T, W, S0, M, H, down) + price = self.recursion.recursive_price() + return price + + def initialization(self, T: float, W: int, S0: float, M: int, H: int, down: bool): + self.num_params = NumericalParams() + self.grid = GridParams( + W, S0, self.N, alpha=self.num_params.alpha, T=T, M=M, H=H, down=down + ) + self.exp_mat = ExponentialMat(self.grid, self.model, self.num_params, T) + self.toep = Toeplitz(self.grid, self.model) + self.recursion = RecursivePrice( + self.exp_mat, self.toep, self.grid, self.num_params + ) + return + + def _beta_computation(self): + return + + def _check_call_put(self, is_call: bool): + match is_call: + case False: + raise NotImplementedError("Only put pricing is implemented so far.") + case True: + pass diff --git a/fypy/pricing/fourier/StochVol/ProjBermudanPricer_SV.py b/fypy/pricing/fourier/StochVol/ProjBermudanPricer_SV.py new file mode 100644 index 0000000..9273802 --- /dev/null +++ b/fypy/pricing/fourier/StochVol/ProjBermudanPricer_SV.py @@ -0,0 +1,263 @@ +import numpy as np + +# np.set_printoptions(precision=4) + +from fypy.model.FourierModel import FourierModel + +from fypy.pricing.fourier.StochVol.StochVolParams import ( + Toeplitz, + GridParamsGeneric, + NumericalParams, + ExponentialMat, + PayoffConstants, +) +from fypy.pricing.fourier.StochVol.StochVolPricer import NonRecursivePriceGeneric + + +class GridParams(GridParamsGeneric): + def __init__(self, W: float, S0: float, N: int, alpha: float, T: float, M: int): + super().__init__(W, S0, N, alpha, T, M) + self.init_variables() + self.conditional_variables() + self.update_and_create_variables() + + def init_variables(self, **kwargs): + self.dx = self._get_dx() + self.a = self._get_a() + self.dxtil = self._get_dxtil() + self.K = self._get_K() + self.nnot = self._get_nnot() + self.xmin = self._get_xmin() + self.nbar = self._get_nbar() + return + + def conditional_variables(self): + if abs(self.lws) < self.dxtil: + self.dx = self.dxtil + elif self.lws < 0: + self.dx = self.lws / (1 + self.nbar - self.K / 2) + self.nbar += 1 + elif self.lws > 0: + self.dx = self.lws / (self.nbar - self.K / 2) + return + + def update_and_create_variables(self): + self.pf_cons = PayoffConstants(self.dx, self.gauss_quad.b4, self.gauss_quad.b3) + self.a = 1 / self.dx + self.xmin = (1 - self.K / 2) * self.dx + self.rho = self._get_rho() + self.zeta = self._get_zeta() + self.dxi = self._get_dxi() + self.xi = self._get_xi() + self.gs = self._get_gs() + self.thetM = self._get_thetM() + return + + +class ProjBermudanPricer_SV: + def __init__(self, model: FourierModel, N: int = 2**11): + self._init_constants(N) + self.model = model + + def _init_constants(self, N: int): + self.N = N + + # Main funtion: pricing method + def price(self, T: float, W: int, S0: float, M: int, is_call: bool) -> float: + self._check_call_put(is_call=is_call) + num_params = NumericalParams() + grid = GridParams(W, S0, self.N, alpha=num_params.alpha, T=T, M=M) + exp_mat = ExponentialMat(grid, self.model, num_params, T) + toep = Toeplitz(grid, self.model) + recursion = RecursivePricer(exp_mat, toep, grid, num_params) + price = recursion.recursive_price() + return price + + # need to initialize ProjPricer + def _beta_computation(self, exp_mat: np.ndarray): + raise NotImplementedError + + # check that contract asked is implemented (NOTE: to move in a proper class ?) + def _check_call_put(self, is_call: bool): + match is_call: + case True: + raise NotImplementedError("Only put pricing is implemented so far.") + case False: + pass + + +class RecursivePricer(NonRecursivePriceGeneric): + def __init__( + self, + mat: ExponentialMat, + toep: Toeplitz, + grid: GridParams, + num_params: NumericalParams, + ): + super().__init__(mat, toep, grid, num_params) + return + + def _get_val(self, k0: int): + nnot = self.grid.nnot + val1 = np.max([self.cont[nnot - 1, k0 - 1], self.grid.gs[nnot - 1]]) + val2 = np.max([self.cont[nnot - 1, k0], self.grid.gs[nnot - 1]]) + return val1, val2 + + def _init_variables(self): + self._thet = self._init_thet() + self.cont = self._create_cont() + self._kstr = np.full(self.num_params.m0, self.grid.nbar - 1, dtype="int") + return + + def _init_thet(self): + return np.zeros((self.grid.K, self.num_params.m0)) + + def _vec_for_thet_update(self): + return self.grid.thetM + + def _update_thet(self, j: int) -> None: + self._update_thet_constants(j) + self._update_thet_sub(j) + + def _update_thet_constants(self, j): + self._while(j) + self._update_kstr(j) + self.rho = self._xstrs - self._xkstr + self._update_zeta() + # NOTE: Why don't move all of these constant in a class depending only self.grid, self.gauss_quad and _xstrs skstr + self._update_ed() + self._update_dbar() + self._update_d() + self.idx_j = self._kstr[j] + self.ck4 = self.cont[self.idx_j + 2, j] + return + + def _update_d(self): + self.d0 = self._get_d0() + self.d1 = self._get_d1() + + def _update_thet_sub(self, j): + idx_j = self.idx_j + self._thet[:idx_j, j] = self.grid.thetM[:idx_j] + self._thet[self.grid.K - 1, j] = self._update_thet_sub1(j) + self._thet[idx_j + 1, j] = self._update_thet_sub2( + self.dbar1, self.zeta, self.d1 + ) + self._thet[idx_j + 2 : (self.grid.K - 1), j] = self._update_thet_sub3(j, idx_j) + self._thet[idx_j, j] = self._update_thet_sub4(self.dbar0, self.zeta, self.d0) + + def _update_zeta(self): + self.zeta = self.grid.a * self.rho + self.zeta_minus = self.zeta * self.grid.gauss_quad.q_minus + self.zeta_plus = self.zeta * self.grid.gauss_quad.q_plus + + def _update_ed(self): + self.ed1 = np.exp(self.rho * self.grid.gauss_quad.q_minus) + self.ed2 = np.exp(0.5 * self.rho) + self.ed3 = np.exp(self.rho * self.grid.gauss_quad.q_plus) + + def _update_dbar(self): + self.dbar1 = self.zeta**2 / 2 + self.dbar0 = self.zeta - self.dbar1 + + def _get_d1(self): + return ( + np.exp(-self.grid.dx) + * self.zeta + * ( + 5 * (self.zeta_minus * self.ed1 + self.zeta_plus * self.ed3) + + 4 * self.zeta * self.ed2 + ) + / 18 + ) + + def _get_d0(self): + return ( + self.zeta + * ( + 5 * ((1 - self.zeta_minus) * self.ed1 + (1 - self.zeta_plus) * self.ed3) + + 4 * (2 - self.zeta) * self.ed2 + ) + / 18 + ) + + def _update_thet_sub4(self, dbar0: float, zeta: float, d0: float): + return ( + self.grid.W * (0.5 + dbar0) + - self.grid.S0 * np.exp(self._xkstr) * (self.grid.pf_cons.varthet_m10 + d0) + + zeta**4 / 8 * (self._ck1 - 2 * self._ck2 + self._ck3) + + zeta**3 / 3 * (self._ck2 - self._ck1) + + zeta**2 / 4 * (self._ck1 + 2 * self._ck2 - self._ck3) + - zeta * self._ck2 + - self._ck1 / 24 + + 5 / 12 * self._ck2 + + self._ck3 / 8 + ) + + def _update_thet_sub3(self, j: int, idx_j: np.ndarray | int) -> np.ndarray: + return ( + self.cont[idx_j + 1 : self.grid.K - 2, j] + + 10 * self.cont[idx_j + 2 : self.grid.K - 1, j] + + self.cont[idx_j + 3 : self.grid.K, j] + ) / 12 + + def _update_thet_sub2(self, dbar1: float, zeta: float, d1: float) -> float: + return ( + self.grid.W * dbar1 + - self.grid.S0 * np.exp(self._xkstr + self.grid.dx) * d1 + + zeta**4 / 8 * (-self._ck2 + 2 * self._ck3 - self.ck4) + + zeta**3 / 6 * (3 * self._ck2 - 4 * self._ck3 + self.ck4) + - 0.5 * zeta**2 * self._ck2 + + (self._ck2 + 10 * self._ck3 + self.ck4) / 12 + ) + + def _update_thet_sub1(self, j: int) -> float: + return ( + 13 * self.cont[self.grid.K - 1, j] + + 15 * self.cont[self.grid.K - 2, j] + - 5 * self.cont[self.grid.K - 3, j] + + self.cont[self.grid.K - 4, j] + ) / 48 + + def _while(self, j: int) -> None: + idx = self._kstr[j] + while idx > 1 and (self.cont[idx, j] > self.grid.gs[idx]): + idx -= 1 + self._kstr[j] = idx + return + + def _update_kstr(self, j: int) -> None: + idx_j = self._kstr[j] + if self._kstr[j] >= 1: + self._update_ck(j, idx_j) + self._update_gk(idx_j) + self._update_temp() + self._update_x(idx_j) + + else: + self._update_x_other_case(j) + return + + def _update_x_other_case(self, j): + self._kstr[j] = 1 + self._xstrs = self.grid.xmin + self._xkstr = self.grid.xmin + + def _update_x(self, idx_j): + self._xkstr = self.grid.xmin + (idx_j) * self.grid.dx + self._xstrs = ( + (self._xkstr + self.grid.dx) * self.tmp1 - self._xkstr * self.tmp2 + ) / (self.tmp1 - self.tmp2) + + def _update_temp(self): + self.tmp1 = self._ck2 - self.gk2 + self.tmp2 = self._ck3 - self.gk3 + + def _update_gk(self, idx_j): + self.gk2 = self.grid.gs[idx_j] + self.gk3 = self.grid.gs[idx_j + 1] + + def _update_ck(self, j, idx_j): + self._ck1 = self.cont[idx_j - 1, j] + self._ck2 = self.cont[idx_j, j] + self._ck3 = self.cont[idx_j + 1, j] diff --git a/fypy/pricing/fourier/StochVol/ProjCliquetPricer_SV.py b/fypy/pricing/fourier/StochVol/ProjCliquetPricer_SV.py new file mode 100644 index 0000000..264b2e0 --- /dev/null +++ b/fypy/pricing/fourier/StochVol/ProjCliquetPricer_SV.py @@ -0,0 +1,369 @@ +import numpy as np + +# np.set_printoptions(precision=4) + +from fypy.model.FourierModel import FourierModel + + +from fypy.pricing.fourier.StochVol.StochVolParams import ( + GridParamsGeneric, + NumericalParams, + ExponentialMat, + AlphaRecursiveReturn, + TYPES, +) + +from fypy.pricing.fourier.StochVol.StochVolPricer import RecursiveReturnPricer + + +class GridParams(GridParamsGeneric): + def __init__( + self, + W: float, + S0: float, + N: int, + alpha: float, + T: float, + M: int, + C: float, + F: float, + contract: int, + ): + super().__init__(W, S0, N, alpha, T, M) + self.init_variables(alpha, C, F) + self.conditional_variables(contract, C, F) + self.update_and_create_variables() + + def init_variables(self, alpha: float, C: float, F: float): + self.alpha = alpha + self.dx = 2 * self.alpha / (self.N - 1) + self.a = 1 / self.dx + self.dt = self.T / self.M + self.xmin = (1 - self.N / 2) * self.dx + self.lc = np.log(1 + C) + self.lf = np.log(1 + F) + self.klc = int(np.floor(self.a * (self.lc - self.xmin)) + 1) + xklc = self.xmin + (self.klc - 1) * self.dx + self.xmin = self.xmin + (self.lc - xklc) + self.klf = int(np.floor(self.a * (self.lf - self.xmin)) + 1) + return + + def conditional_variables(self, contract: int, C: float, F: float): + match contract: + case 2 | 3 | 4: + if self.klc != self.klf: + self.dx = (self.lc - self.lf) / (self.klc - self.klf) + self.a = 1 / self.dx + self.xmin = self.lf - (self.klf - 1) * self.dx + + self.hlocalCF = ( + lambda x: F * (x <= self.lf) + + (np.exp(x) - 1) * (x < self.lc) * (x > self.lf) + + C * (x >= self.lc) + ) + case _: + raise NotImplementedError + return + + def update_and_create_variables(self): + self.A = 32 * self.a**4 + self.C_an = self.A / self.N + self.dxi = 2 * np.pi * self.a / self.N + self.xi = self.dxi * np.arange(self.N) + return + + +class RecursivePricer(RecursiveReturnPricer): + def __init__( + self, + model: FourierModel, + mat: ExponentialMat, + grid: GridParamsGeneric, + num_params: NumericalParams, + ): + super().__init__(model, grid, num_params, mat) + + def _set_left_and_NMM(self, contract: int): + match contract: + case 2 | 3 | 4: + self.leftGridPoint = self.grid.lf - self.grid.dx + # this is the left bound of gaussian quadrature grid + self.NNM = self.grid.klc - self.grid.klf + 3 + # this is N_Psi + case 1 | 5: + raise NotImplementedError + case _: + raise NotImplementedError + + def _init_phi(self): + self.phi_old = np.zeros((self.grid.N - 1, self.num_params.m0), dtype="complex_") + self.phi_y_new = np.zeros( + (self.grid.N - 1, self.num_params.m0), dtype="complex_" + ) + return + + def _get_phi_new(self, C: float, F: float): + self._set_expFC(C, F) + self._build_phi_new_iter() + return + + def _build_phi_new_iter(self): + for j in range(self.num_params.m0): + self._set_column_phi_old(j) + beta_temp = np.real( + np.fft.fft(np.hstack((1 / self.grid.A, self.phi_old[:, j] * self.hvec))) + ) + sumBetaLeft = self._get_sum_beta_left(beta_temp) + sumBetaRight = self._get_sum_beta_right(beta_temp, sumBetaLeft) + self._set_column_phi_new(j, sumBetaLeft, sumBetaRight, beta_temp) + + def _set_expFC(self, C, F): + self.expFxi = np.exp(1j * F * self.grid.xi) + self.expCxi = np.exp(1j * C * self.grid.xi) + + def _set_column_phi_old(self, j): + for n in range(self.grid.N - 1): + self.phi_old[n, j] = self.transit_mat[:, j, n + 1].sum() + + def _set_column_phi_new(self, j, sumBetaLeft, sumBetaRight, beta_temp): + col = self.psi @ beta_temp[self.grid.klf - 2 : self.grid.klc + 1] + self.phi_y_new[:, j] = ( + col + self.expFxi * sumBetaLeft + self.expCxi * sumBetaRight + ) + + def _get_sum_beta_right(self, beta_temp, sumBetaLeft): + return ( + 1 + - sumBetaLeft + - self.grid.C_an * beta_temp[self.grid.klf - 2 : self.grid.klc + 1].sum() + ) + + def _get_sum_beta_left(self, beta_temp): + sumBetaLeft = self.grid.C_an * beta_temp[: self.grid.klf - 2].sum() + return sumBetaLeft + + def _get_PhiY(self, C: float, F: float, contract: int): + self._PSI_computation(contract, self.grid.a) + self.transit_mat = self.exp_mat.get_exp_tensor() + self.zeta = self._get_zeta_vec(self.grid.a, self.grid.xi) + self.grid.xi = self.grid.xi[1:] + self.hvec = np.exp(-1j * self.grid.xmin * self.grid.xi) * self.zeta + self._init_phi() + + match contract: + case 2 | 3 | 4: + self._get_phi_new(C, F) + self._get_phi() + case _: + raise NotImplementedError + + return + + def _get_phi(self): + self.phi_y = np.zeros( + (self.num_params.m0, self.num_params.m0, self.grid.N - 1), dtype="complex_" + ) + xiBigF = self.expFxi + xiBigC = self.expCxi + + if self.grid.M > 1: + for j in range(self.num_params.m0): + for k in range(self.num_params.m0): + self._build_line_phi(xiBigF, xiBigC, j, k) + + def _build_line_phi(self, xiBigF, xiBigC, j, k): + grand = self.hvec * self.transit_mat[k, j, 1:] + beta_temp = np.real( + np.fft.fft(np.hstack((self.transit_mat[k, j, 0] / self.grid.A, grand))) + ) + sum_left, sum_right = self._get_sums(beta_temp) + self._compute_line_y(xiBigF, xiBigC, j, k, beta_temp, sum_left, sum_right) + + def _compute_line_y(self, xiBigF, xiBigC, j, k, beta_temp, sum_left, sum_right): + line = self.psi @ beta_temp[self.grid.klf - 2 : self.grid.klc + 1] + self.phi_y[j, k, :] = line + xiBigF * sum_left + xiBigC * sum_right + return + + def _get_sums(self, beta_temp): + sum_left = self.grid.C_an * beta_temp[: self.grid.klf - 2].sum() + sum_right = self.grid.C_an * beta_temp[self.grid.klc + 1 :].sum() + return sum_left, sum_right + + def _refining(self, contract: int, CG: float, FG: float): + match contract: + case 3: + dx = self.grid.dx + CmF = CG - FG + ymin = FG - dx + self.kc = np.floor(self.grid.a * (CG - ymin)) + 1 + z, z2, z3, z4, z5 = self._get_z(CG, dx, ymin) + self._refine_thet(dx, CmF, z, z2, z3, z4, z5) + k = int(self.kc + 2) + self.theta[k:] = CmF + if k > self.grid.N / 2: + raise ValueError + self.hvec = np.exp(-1j * ymin * self.grid.xi) * self.zeta + case _: + raise NotImplementedError + return + + def _refine_thet(self, dx, CmF, z, z2, z3, z4, z5): + self._refine_thet_sub1(dx) + self._refine_thet_sub2(dx, CmF, z, z2, z3, z4, z5) + self._refine_thet_sub3(dx, CmF, z, z2, z3, z4, z5) + self._refine_thet_sub4(dx, CmF, z, z2, z3, z4, z5) + self._refine_thet_sub5(dx, CmF, z4, z5) + + def _get_z(self, CG, dx, ymin): + z = self.grid.a * (CG - (ymin + (self.kc - 1) * dx)) + z2 = z**2 + z3 = z * z2 + z4 = z * z3 + z5 = z * z4 + return z, z2, z3, z4, z5 + + def _refine_thet_sub1(self, dx): + kc = self.kc + self.theta = np.zeros(int(self.grid.N / 2)) + self.theta[0] = dx / 120 + self.theta[1] = dx * 7 / 30 + self.theta[2] = dx * 121 / 120 + self.theta[3 : int(kc - 2)] = dx * np.arange(2, int(kc - 3)) + + def _refine_thet_sub5(self, dx, CmF, z4, z5): + k = int(self.kc + 2) + self.theta[k - 1] = dx * (z5 / 30 + (k - 4) * z4 / 24) + CmF * (1 - z4 / 24) + # self.theta[k : int(self.grid.N / 2)] = CmF + + def _refine_thet_sub4(self, dx, CmF, z, z2, z3, z4, z5): + k = int(self.kc + 1) + self.theta[k - 1] = dx * ( + k * (-z4 / 8 + z3 / 6 + z2 / 4 + z / 6 + 1 / 24) + - z5 / 10 + + z4 / 2 + - z3 / 3 + - 2 * z2 / 3 + - z / 2 + - 2 / 15 + ) + CmF * (0.5 + 1 / 24 * (3 * z4 - 4 * z3 - 6 * z2 - 4 * z + 11)) + + def _refine_thet_sub3(self, dx, CmF, z, z2, z3, z4, z5): + k = int(self.kc) + self.theta[k - 1] = dx * ( + k * (z4 / 8 - z3 / 3 + 2 * z / 3 + 0.5) + + z5 / 10 + - z4 / 2 + + 2 * z3 / 3 + + z2 / 3 + - 4 * z / 3 + - 37 / 30 + ) + CmF * (-z4 / 8 + z3 / 3 - 2 * z / 3 + 1 / 2) + + def _refine_thet_sub2(self, dx, CmF, z, z2, z3, z4, z5): + k = int(self.kc - 1) + self.theta[k - 1] = ( + dx + * ( + k * (-z4 / 24 + z3 / 6 - z2 / 4 + z / 6 + 23 / 24) + - z5 / 30 + + z4 / 6 + - z3 / 3 + + z2 / 3 + - z / 6 + - 59 / 30 + ) + + CmF * (z - 1) ** 4 / 24 + ) + + +class ProjCliquetPricer_SV: + def __init__(self, model: FourierModel, N: int = 2**10, L: float = 14): + self.L = L + self.N = N + self.model = model + + def _beta_computation(self, k: int): + beta = np.real( + np.fft.fft( + np.hstack( + ( + 1 / self.grid.A, + self.recursive_pricer.hvec + * self.recursive_pricer.phi_y_new[:, k], + ) + ) + ) + ) + return beta + + def price( + self, + T: float, + M: int, + W: float, + S0: float, + C: float, + CG: float, + F: float, + FG: float, + contract: int, + ): + + self._init_constants(W, S0, T, M, contract, C, F) + self.recursive_pricer._recursion() + self.recursive_pricer._refining(contract, CG, FG) + price = self._interpolation(contract, FG, W) + return price + + def _init_constants( + self, K: float, S0: float, T: float, M: float, contract: int, C: float, F: float + ): + self._init_classes(K, S0, T, M, contract, C, F) + self.recursive_pricer._get_PhiY(C, F, contract) + return + + def _init_classes(self, K, S0, T, M, contract, C, F): + alpha = AlphaRecursiveReturn(self.model, T, self.L)() + self.num_params = NumericalParams() + self.grid = GridParams(K, S0, self.N, alpha, T, M, C, F, contract) + self.exp_mat = ExponentialMat(self.grid, self.model, self.num_params, T) + # self.recursive_pricer = RecursivePricer( + # self.model, self.grid, self.num_params, self.exp_mat + # ) + self.recursive_pricer = RecursivePricer( + self.model, self.exp_mat, self.grid, self.num_params + ) + + def _interpolation(self, contract: int, FG: float, W: float): + if isinstance(self.model, TYPES.Hes_base): + k0 = self.recursive_pricer._get_k0() + disc = self.model.discountCurve.discount_T(self.grid.T) + v0 = self.model.v_0 + v = self.exp_mat.get_v() + match contract: + case 3: + val1 = self._get_val(FG, W, k0 - 1, disc) + val2 = self._get_val(FG, W, k0, disc) + price = val1 + (val2 - val1) * (v0 - v[k0 - 1]) / ( + v[k0] - v[k0 - 1] + ) + return price + case _: + raise NotImplementedError + else: + raise NotImplementedError + + def _get_val(self, FG, W, k, disc): + beta_temp = self._beta_computation(k) + val1 = ( + W + * disc + * ( + FG + + self.grid.C_an + * self.recursive_pricer.theta + @ beta_temp[: int(self.grid.N / 2)] + ) + ) + + return val1 diff --git a/fypy/pricing/fourier/StochVol/ProjDVSwap_SV.py b/fypy/pricing/fourier/StochVol/ProjDVSwap_SV.py new file mode 100644 index 0000000..63c27ce --- /dev/null +++ b/fypy/pricing/fourier/StochVol/ProjDVSwap_SV.py @@ -0,0 +1,205 @@ +import numpy as np + +# np.set_printoptions(precision=4) + +from fypy.model.FourierModel import FourierModel + +from copy import deepcopy + + +from fypy.pricing.fourier.StochVol.StochVolParams import ( + GridParamsGeneric, + NumericalParams, + ExponentialMat, + AlphaRecursiveReturn, + TYPES, +) +from fypy.pricing.fourier.StochVol.StochVolPricer import RecursiveReturnPricer + + +class GridParams(GridParamsGeneric): + def __init__(self, W: float, S0: float, N: int, alpha: float, T: float, M: int): + super().__init__(W, S0, N, alpha, T, M) + self.init_variables() + + def init_variables(self, **kwargs): + self.dx = 2 * self.alpha / (self.N - 1) + self.a = 1 / self.dx + self.dt = self.T / self.M + self.xmin = (1 - self.N / 2) * self.dx + self.A = 32 * self.a**4 + self.C_an = self.A / self.N + self.dxi = 2 * np.pi * self.a / self.N + self.xi = self.dxi * np.arange(self.N) + self.hlocalCF = lambda x: x**2 + return + + def conditional_variables(self): + return + + def update_and_create_variables(self): + return + + +class RecursivePricer(RecursiveReturnPricer): + def __init__( + self, + model: FourierModel, + mat: ExponentialMat, + grid: GridParamsGeneric, + num_params: NumericalParams, + ): + super().__init__(model, grid, num_params, mat) + + def _set_left_and_NMM(self, contract: int): + match contract: + case 1 | 3: + self.leftGridPoint = self.grid.xmin + self.NNM = self.grid.N + case _: + raise NotImplementedError( + "Only contracts 1 and 3 are implemented for now." + ) + return + + def _init_phi(self): + self.phi_old = np.zeros((self.grid.N - 1, self.num_params.m0), dtype="complex_") + self.phi_y_new = np.zeros( + (self.grid.N - 1, self.num_params.m0), dtype="complex_" + ) + return + + def _get_phy_new(self): + self.phi_old = self.transit_mat[:, :, 1:].sum(axis=0).T + to_fft = np.vstack( + ( + np.asarray([1 / self.grid.A] * self.num_params.m0), + self.phi_old * self.hvec[:, None], + ) + ).T + beta_temp = np.real(np.fft.fft(to_fft)) + self.phi_y_new = self.psi @ beta_temp.T + return + + def _get_phi(self): + to_fft = deepcopy(self.transit_mat) + to_fft[:, :, 0] = to_fft[:, :, 0] / self.grid.A + to_fft[:, :, 1:] = to_fft[:, :, 1:] * self.hvec[None, None, :] + beta_temp = np.real(np.fft.fft(to_fft)) + self.phi_y = np.matmul(beta_temp.transpose(1, 0, 2), self.psi.T) + return + + def _get_PhiY(self): + self._PSI_computation(1, self.grid.a) + self.transit_mat = self.exp_mat.get_exp_tensor() + self.zeta = self._get_zeta_vec(self.grid.a, self.grid.xi) + self.grid.xi = self.grid.xi[1:] + self.hvec = np.exp(-1j * self.grid.xmin * self.grid.xi) * self.zeta + self._init_phi() + self._get_phy_new() + self._get_phi() + return + + +class ProjDVSwap_SV: + def __init__(self, model: FourierModel, N: int = 2**10, L: float = 10.0): + self.L = L + self.N = N + self.model = model # model + + def _beta_computation(self, k: int): + beta = np.real( + np.fft.fft( + np.hstack( + (1 / self.grid.A, self.hvec * self.recursive_pricer.phi_y_new[:, k]) + ) + ) + ) + return beta + + def price( + self, + T: float, + M: int, + W: float, + contract: int, + ): + self._init_constants(W, T, M) + self.recursive_pricer._recursion() + price = self._interpolation(contract) + return price + + def _init_constants(self, K: float, T: float, M: float): + self._init_classes(K, T, M) + self.recursive_pricer._get_PhiY() + return + + def _init_classes(self, K, T, M): + alpha = AlphaRecursiveReturn(self.model, T, self.L)() + self.num_params = NumericalParams() + self.grid = GridParams(W=K, N=self.N, alpha=alpha, T=T, M=M, S0=1) + self.exp_mat = ExponentialMat(self.grid, self.model, self.num_params, T) + self.recursive_pricer = RecursivePricer( + self.model, self.exp_mat, self.grid, self.num_params + ) + + def _adjust_xmin_interp(self, contract: int): + match contract: + case 1: + self.grid.xmin = -self.grid.dx + case 3: + self.grid.xmin = self.grid.W * self.grid.T - self.grid.dx + case _: + raise NotImplementedError("No other contracts for now.") + return + + def _set_hvec(self): + match bool(self.grid.dx != 0): + case True: + self.hvec = ( + np.exp(-1j * self.grid.xmin * self.grid.xi) + * self.recursive_pricer.zeta + ) + case False: + self.hvec = self.recursive_pricer.zeta + return + + def _get_val(self, k: int, grid: np.ndarray): + beta_temp = self._beta_computation(k) + val = ( + self.grid.C_an + * grid[: int(self.grid.N / 2)] + @ beta_temp[: int(self.grid.N / 2)] + ) + return val + + def _interpolation(self, contract: int): + if isinstance(self.model, TYPES.Hes_base): + k0 = self.recursive_pricer._get_k0() + v0 = self.model.v_0 + v = self.exp_mat.get_v() + disc = self.model.discountCurve.discount_T(self.grid.T) + val1, val2 = self._get_vals(contract, k0) + price = val1 + (val2 - val1) * (v0 - v[k0 - 1]) / (v[k0] - v[k0 - 1]) + match contract: + case 3: + return disc * price / self.grid.T + case 1: + return price / self.grid.T + else: + raise NotImplementedError + + def _get_vals(self, contract, k0): + self._adjust_xmin_interp(contract) + self._set_hvec() + grid = self._get_grid_interp() + val1 = self._get_val(k0 - 1, grid) + val2 = self._get_val(k0, grid) + return val1, val2 + + def _get_grid_interp(self): + grid = self.grid.dx * np.arange(-1, self.grid.N - 1) + grid[0] = grid[0] / 24 + self.grid.dx / 20 + grid[1] = 7 * self.grid.dx / 30 + grid[2] = grid[2] * 23 / 24 + self.grid.dx / 20 + return grid diff --git a/fypy/pricing/fourier/StochVol/StochVolParams.py b/fypy/pricing/fourier/StochVol/StochVolParams.py new file mode 100644 index 0000000..c232d68 --- /dev/null +++ b/fypy/pricing/fourier/StochVol/StochVolParams.py @@ -0,0 +1,496 @@ +import numpy as np +import scipy + +from dataclasses import dataclass +from abc import abstractmethod, ABC +from typing import Union, Any, Optional, Callable +from fypy.model.sv.Heston import _HestonBase +from fypy.model.sv.HestonDEJumps import HestonDEJumps +from fypy.model.sv.Bates import Bates +from fypy.model.sv.Heston import Heston + +from fypy.model.FourierModel import FourierModel +from copy import deepcopy + +# from typing import Union + + +# From a list of array, return a tensor whose slcies are diagonal with the vector as diag +def diags_to_tens(vector_list: np.ndarray) -> np.ndarray: + vectors = np.array(vector_list) + tensor_shape = (*vectors.shape[:-1], vectors.shape[-1], vectors.shape[-1]) + tensor = np.zeros(tensor_shape, dtype="complex_") + idx = np.arange(vectors.shape[-1]) + tensor[..., idx, idx] = vectors + return tensor + + +# Scaling trick to compute exponential matrix without getting NaN +def custom_expm(matrix: np.ndarray, n: int = 4) -> np.ndarray: + return np.linalg.matrix_power(scipy.linalg.expm(matrix / 2**n), 2**n) + + +@dataclass +class NumericalParams: + m0: int = 30 + alpha: float = 6 + gamma: float = 3.3 + gridMethod: int = 4 + gridMultParam: float = 0.2 + boundaryMethod: int = 1 + + +@dataclass +class GaussianQuad: + q_plus: float = 0.5 * (1 + (3 / 5) ** 0.5) + q_minus: float = 0.5 * (1 - (3 / 5) ** 0.5) + b3: float = 15**0.5 + b4: float = b3 / 10 + + +class PayoffConstants: + def __init__(self, dx: float, b4: float, b3: float): + self.varthet_01 = ( + np.exp(0.5 * dx) * (5 * np.cosh(b4 * dx) - b3 * np.sinh(b4 * dx) + 4) / 18 + ) + self.varthet_m10 = ( + np.exp(-0.5 * dx) * (5 * np.cosh(b4 * dx) + b3 * np.sinh(b4 * dx) + 4) / 18 + ) + self.varthet_star = self.varthet_01 + self.varthet_m10 + + +class GridParamsGeneric(ABC): + def __init__( + self, + W: float, + S0: float, + N: int, + alpha: float, + T: float, + M: int, + H: Optional[int] = None, + down: Optional[bool] = None, + ): + # required variables + self.W = W + self.S0 = S0 + self.lws = np.log(W / S0) + self.N = N + self.alpha = alpha + self.H = H + self.T = T + self.M = M + self.dt = T / M + self.down = down + self.gauss_quad = GaussianQuad() + # to be init later + self.A = 0 + self.dx = 0 + self.K = 0 + self.a = 0 + self.rho = 0 + self.dxi = 0 + self.xmin: float = 0 + self.xbar = 0 + self.nbar = 0 + self.xi = np.ndarray([]) + self.upsilon = 0 + self.C_an = 0 + self.klf = 0 + self.klc = 0 + self.lf = 0 + self.nnot = 0 + self.gs = np.array([]) + self.zeta = 0 + self.pf_cons = PayoffConstants(self.dx, self.gauss_quad.b4, self.gauss_quad.b3) + self.thetM = np.ndarray([]) + self.hlocalCF: Optional[Callable] = None + + @abstractmethod + def init_variables(self, *args, **kwargs): + raise NotImplementedError + + @abstractmethod + def conditional_variables(self, *args, **kwargs): + raise NotImplementedError + + @abstractmethod + def update_and_create_variables(self): + raise NotImplementedError + + def _get_nnot(self): + return int(self.K / 2) + + def _get_xi(self): + return self.dxi * np.arange(self.N) + + def _get_zeta(self): + return self.a * self.rho + + def _get_rho(self): + return self.lws - (self.xmin + (self.nbar - 1) * self.dx) + + def _get_K(self): + return int(self.N / 2) + + def _get_a(self): + return 1 / self.dx + + def _get_dx(self): + return 2 * self.alpha / (self.N - 1) + + def _get_xmin(self) -> float: + return (1 - self.K / 2) * self.dx + + def _get_dxi(self) -> float: + return 2 * np.pi * self.a / self.N + + def _get_nbar(self) -> int: + return int(self.a * (self.lws - self.xmin) + 1) + + def _get_dxtil(self) -> float: + return self.dx + + def _get_gs(self) -> np.ndarray: + try: + gs = np.zeros(self.K) + gs[: self.nbar] = self.W - self.S0 * np.exp( + self.xmin + self.dx * np.arange(self.nbar) + ) + return gs + except: + raise NotImplementedError + + def _get_thetM(self) -> np.ndarray: + try: + thetM = np.zeros(self.K) + thetM[: self.nbar - 1] = self.W - self.pf_cons.varthet_star * ( + self.S0 * np.exp(self.xmin + self.dx * np.arange(self.nbar - 1)) + ) + thetM[self.nbar - 1] = self.W * (0.5 - self.pf_cons.varthet_m10) + return thetM + except: + raise ValueError + + ######################################### + ######## Constants for grid bounds ###### + ######################################### + + def _get_var_grid_bound( + self, model: FourierModel, T: float, gamma: float + ) -> tuple[float, float]: + if isinstance(model, _HestonBase): + mu_h = np.exp(-model.kappa * T) * model.v_0 + model.theta * ( + 1 - np.exp(-model.kappa * T) + ) + sig2_h = self._get_sigma_heston(model, T) + lx = max(0.00001, mu_h - gamma * sig2_h**0.5) + ux = mu_h + gamma * sig2_h**0.5 + return ux, lx + else: + raise NotImplementedError("Only implemented for Heston so far.") + + def _get_sigma_heston(self, model: _HestonBase, T: float) -> float: + return ( + model.sigma_v**2 + / (model.kappa) + * ( + model.v_0 * (np.exp(-model.kappa * T) - np.exp(-2 * model.kappa * T)) + + 0.5 + * model.theta + * (1 - 2 * np.exp(-model.kappa * T) + np.exp(-2 * model.kappa * T)) + ) + ) + + def _get_mu_heston(self, model: _HestonBase, T: float): + return np.exp(-model.kappa * T) * model.v_0 + model.theta * ( + 1 - np.exp(-model.kappa * T) + ) + + +class ExponentialMat: + def __init__( + self, + grid: GridParamsGeneric, + model: FourierModel, + num_params: NumericalParams, + T: float, + ): + self.grid = grid + self.model = model + self.boundary_method = 1 + self.num_params = num_params + self.T = T + self.ux, self.lx = self.grid._get_var_grid_bound( + self.model, self.T / 2, self.num_params.gamma + ) + + def get_v(self) -> np.ndarray: + # common parameters for all gridMethod + m0 = self.num_params.m0 + nx = m0 + dx = (self.ux - self.lx) / nx + if isinstance(self.model, TYPES.Hes_base): + center = self.model.v_0 + else: + raise NotImplementedError + # different gridMethod case + match self.num_params.gridMethod: + case 1: + return self.lx + dx * np.arange(1, nx + 1) + case 4: + v = self._get_v_grid4(m0, center) + return v + case _: + raise NotImplementedError("Only gridMethod [1,4] implemented so far.") + + def _get_v_grid4(self, m0: int, center: float) -> np.ndarray: + v = np.zeros(m0) + v[0] = self.lx + v[-1] = self.ux + alpha_mult = self.num_params.gridMultParam * (v[-1] - v[0]) + c1, c2 = np.arcsinh((v[[0, -1]] - center) / alpha_mult) + sub_vec = c2 / m0 * np.arange(2, m0) + c1 * (1 - np.arange(2, m0) / m0) + v[1:-1] = center + alpha_mult * np.sinh(sub_vec) + return v + + ################################### + ####### Build the matrix Q ######## + ################################### + + def _get_Q(self, v: np.ndarray) -> np.ndarray: + m0 = self.num_params.m0 + mu_vec = self._mu_func(v) + mu_plus, mu_minus = mu_vec * (mu_vec > 0), -mu_vec * (-mu_vec > 0) + sig2 = self._sig_func(v) ** 2 + h = np.diff(v) + + match self.num_params.gridMethod: + case 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9: + Q = self._get_Q_non_unif(v, mu_plus, mu_minus, sig2, h) + case _: + raise NotImplementedError("Only gridMethod [1-> 9] implemented so far.") + + match self.num_params.boundaryMethod: + case 1: + return self._get_Q_boundaries(m0, Q, mu_vec, h) + case _: + raise NotImplementedError("Only boundMethod 1 implemented so far.") + + def _get_Q_non_unif( + self, + v: np.ndarray, + mu_plus: np.ndarray, + mu_minus: np.ndarray, + sig2: np.ndarray, + h: np.ndarray, + ) -> np.ndarray: + zeros = np.zeros(len(v) - 2) + sub_vec = sig2[1:-1] - (h[1:] * mu_plus[1:-1] + h[:-1] * mu_minus[1:-1]) + aa = np.max([sub_vec, zeros], axis=0) / (h[:-1] + h[1:]) + sub_diag = (mu_minus[1:-1] + aa) / h[:-1] + up_diag = (mu_plus[1:-1] + aa) / h[1:] + diag = np.hstack((0, sub_diag + up_diag, 0)) + sub_diag = np.hstack((sub_diag, 0)) + up_diag = np.hstack((0, up_diag)) + Q = np.diag(sub_diag, k=-1) + np.diag(up_diag, k=+1) - np.diag(diag) + return Q + + def _get_Q_boundaries( + self, m0: int, Q: np.ndarray, mu_vec: np.ndarray, h: np.ndarray + ) -> np.ndarray: + Q[0, 1] = np.abs(mu_vec[0]) / h[0] + Q[0, 0] = -Q[0, 1] + Q[m0 - 1, m0 - 2] = np.abs(mu_vec[-1]) / h[-1] + Q[m0 - 1, m0 - 1] = -Q[m0 - 1, m0 - 2] + return Q + + def _mu_func(self, u: np.ndarray) -> np.ndarray: ## + if isinstance(self.model, _HestonBase): + return self.model.kappa * (self.model.theta - u) + else: + raise NotImplementedError("Only implemented for Heston so far.") + + def _sig_func(self, u: np.ndarray) -> np.ndarray: + if isinstance(self.model, _HestonBase): + return self.model.sigma_v * u**0.5 + + else: + raise NotImplementedError("Only implemented for Heston so far.") + + ################################### + ####### END Matrix Q ######## + ################################### + + def get_exp_tensor(self) -> np.ndarray: + v = self.get_v() + fv = self._get_fv(v) + exp_A = self._get_exp_diags(v) + lambda_exp_power = self._get_lambda_exp(fv) + exp_A[..., 1:] = exp_A[..., 1:] * lambda_exp_power + return exp_A + + def _get_exp_diags(self, v: np.ndarray): + Q = self._get_Q(v) + v1 = self._get_v1(v) + v2 = self._get_v2(v) + Qt = self.grid.dt * Q.T + psi_J = self._get_psi_J() + diags_mat = ( + self.grid.xi[:, None] @ v1[None, :] + - (self.grid.xi.T[:, None] ** 2) @ v2[None, :] + + self.grid.dt * psi_J(self.grid.xi)[:, None] + ) + diag_tensor = diags_to_tens(diags_mat) + Qt + exp_diags = custom_expm(diag_tensor) + return exp_diags.transpose(1, 2, 0) + + def _get_lambda_exp(self, fv: np.ndarray) -> np.ndarray: + m0 = self.num_params.m0 + hor_fv = np.repeat(fv[:, None], m0, axis=1) + vert_fv = np.repeat(fv[None, :], m0, axis=0) + upper_part = np.triu(hor_fv - vert_fv, k=1) + lower_part = -upper_part.T + lambda_dxi = lower_part + upper_part + lambda_exp = np.exp(lambda_dxi) + lambda_exp_tens = np.repeat(lambda_exp[None, :], self.grid.N - 1, axis=0) + lambda_exp_power = np.power( + lambda_exp_tens, np.arange(1, self.grid.N)[..., None, None] + ) + return lambda_exp_power.transpose(1, 2, 0) + + def _get_v1(self, v: np.ndarray) -> np.ndarray: + if isinstance(self.model, _HestonBase): + c1 = (self.model.rho * self.model.kappa / self.model.sigma_v) - 0.5 + rn_drift = self.model.forwardCurve.drift(0, self.grid.T) + c2 = ( + rn_drift + - self.model.rho + * self.model.theta + * self.model.kappa + / self.model.sigma_v + ) + psi_J = self._get_psi_J() + return self.grid.dt * 1j * (c1 * v + c2 - psi_J(-1j)) + else: + raise NotImplementedError("Only implemented for Heston so far.") + + def _get_psi_J(self) -> Callable: + if isinstance(self.model, TYPES.HesDE): + lam, p_up, eta1, eta2 = ( + self.model.lam, + self.model.p_up, + self.model.eta1, + self.model.eta2, + ) + fun = lambda xi: lam * ( + (1 - p_up) * eta2 / (eta2 + 1j * xi) + + p_up * eta1 / (eta1 - 1j * xi) + - 1 + ) + return fun + else: + raise NotImplementedError + + def _get_v2(self, v: np.ndarray) -> np.ndarray: + if isinstance(self.model, _HestonBase): + c3 = 0.5 * (1 - self.model.rho**2) + return self.grid.dt * c3 * v + else: + raise NotImplementedError("Only implemented for Heston so far.") + + def _get_fv(self, v: np.ndarray) -> np.ndarray: + if isinstance(self.model, _HestonBase): + fv = (1j * self.grid.dxi * self.model.rho / self.model.sigma_v) * v + return fv + else: + raise NotImplementedError("Only implemented for Heston so far.") + + +class Toeplitz: + def __init__(self, grid: GridParamsGeneric, model: FourierModel): + self.model = model + self.grid = grid + + def get_beta(self, exp_mat: np.ndarray): + grand_tens = exp_mat[:, :, 1:] * self.hvec + stacked = np.dstack((exp_mat[:, :, [0]] / (24 * self.grid.a**2), grand_tens)) + beta_tens = self.cons * np.real(np.fft.fft(stacked, axis=-1)) + first_range = beta_tens[..., self.grid.K - 1 :: -1] + second_range = beta_tens[..., 2 * self.grid.K - 2 : self.grid.K - 1 : -1] + toepM = np.dstack( + ( + first_range, + np.zeros((first_range.shape[0], first_range.shape[0])), + second_range, + ) + ) + beta_fft = np.fft.fft(toepM) + return beta_fft + + @property + def zmin(self): + return (1 - self.grid.K) * self.grid.dx + + @property + def hvec(self): + return ( + np.exp(-1j * self.zmin * self.xi) + * (np.sin(self.xi / (2 * self.grid.a)) / self.xi) ** 2 + / (2 + np.cos(self.xi / self.grid.a)) + ) + + @property + def xi(self): + return self.grid.dxi * np.arange(1, self.grid.N) + + @property + def cons(self): + r = self.model.forwardCurve.drift(0, self.grid.T) + return 24 * self.grid.a**2 * np.exp(-r * self.grid.dt) / self.grid.N + + +class AlphaRecursiveReturn: + def __init__(self, model: FourierModel, T: float, L: float): + self._model = model + self.T = T + self._L = L + + def _get_alpha_variance(self): + if isinstance(self._model, _HestonBase): + t = self.T / 2 + eta = self._model.kappa + v0 = self._model.v_0 + theta = self._model.theta + mu_h = (np.exp(-eta * t) * v0 + theta * (1 - np.exp(-eta * t))) ** 0.5 + if isinstance(self._model, TYPES.HesDE): + lam, p_up, eta1, eta2 = ( + self._model.lam, + self._model.p_up, + self._model.eta1, + self._model.eta2, + ) + c2_jump = 2 * lam * p_up / (eta1**2) + 2 * lam * (1 - p_up) / (eta2**2) + c4_jump = 24 * lam * (p_up / eta1**4 + (1 - p_up) / eta2**4) + c2 = c2_jump + (mu_h) ** 2 + alpha = max( + self._L * (c2 * t + (c4_jump * self.T) ** 0.5) ** 0.5, + 0.5, + ) + return alpha + else: + raise NotImplementedError("Only HKDE implemented so far.") + + else: + raise NotImplementedError("Only Heston implemented so far.") + + def __call__(self): + return self._get_alpha_variance() + + +@dataclass +class TYPES: + HesDE = HestonDEJumps + Bates = Bates + Hes = Heston + Hes_base = _HestonBase diff --git a/fypy/pricing/fourier/StochVol/StochVolPricer.py b/fypy/pricing/fourier/StochVol/StochVolPricer.py new file mode 100644 index 0000000..8460131 --- /dev/null +++ b/fypy/pricing/fourier/StochVol/StochVolPricer.py @@ -0,0 +1,290 @@ +import numpy as np +from abc import abstractmethod +from fypy.model.FourierModel import FourierModel +from typing import Optional, Any + +from fypy.pricing.fourier.StochVol.StochVolParams import ( + ExponentialMat, + Toeplitz, + GridParamsGeneric, + NumericalParams, + TYPES, +) + + +class RecursiveReturnPricer: + def __init__( + self, + model: FourierModel, + grid: GridParamsGeneric, + num_params: NumericalParams, + exp_mat: ExponentialMat, + ): + self._model = model + self.exp_mat = exp_mat + self.grid = grid + self.num_params = num_params + self.NNM = 0 + self.leftGridPoint = 0 + + def _get_k0(self) -> int: + k0 = 2 + v = self.exp_mat.get_v() + if isinstance(self._model, TYPES.Hes_base): + while self._model.v_0 > v[k0 - 1] and k0 < self.num_params.m0: + k0 += 1 + k0 -= 1 + else: + raise NotImplementedError + return k0 + + def _get_zeta_vec(self, a: float, xi: np.ndarray): + b0 = 1208 / 2520 + b1 = 1191 / 2520 + b2 = 120 / 2520 + b3 = 1 / 2520 + w = xi[1:] + zeta = np.empty_like(w, dtype=complex) + zeta = (np.sin(w / (2 * a)) / w) ** 4 / ( + b0 + b1 * np.cos(w / a) + b2 * np.cos(2 * w / a) + b3 * np.cos(3 * w / a) + ) + return zeta + + def _sig_computation(self): + # Gaussian quadrature coefs + g2 = np.sqrt(5 - 2 * np.sqrt(10 / 7)) / 6 + g3 = np.sqrt(5 + 2 * np.sqrt(10 / 7)) / 6 + v1 = 0.5 * 128 / 225 + v2 = 0.5 * (322 + 13 * np.sqrt(70)) / 900 + v3 = 0.5 * (322 - 13 * np.sqrt(70)) / 900 + # gamma coefs: Appendix A.2.1 + sig = np.array( + [ + -1.5 - g3, + -1.5 - g2, + -1.5, + -1.5 + g2, + -1.5 + g3, + -0.5 - g3, + -0.5 - g2, + -0.5, + -0.5 + g2, + -0.5 + g3, + ] + ) + # cubic spline: sig = phi[3](gamma)*w + sig[0:5] = (sig[0:5] + 2) ** 3 / 6 + sig[5:10] = 2 / 3 - 0.5 * sig[5:10] ** 3 - sig[5:10] ** 2 + # multiplication by w (denoted as v hat) + sig[np.array([0, 4, 5, 9])] *= v3 + sig[np.array([1, 3, 6, 8])] *= v2 + sig[np.array([2, 7])] *= v1 + return self.grid.C_an * sig + + def _thet_computation(self, dx: float, x1: np.ndarray, Neta: int, Neta5: int): + # Computation of thet is required for the _PSI_computation method, + # which itself is essential for the functioning of the _beta_computation method + g2 = np.sqrt(5 - 2 * np.sqrt(10 / 7)) / 6 + g3 = np.sqrt(5 + 2 * np.sqrt(10 / 7)) / 6 + thet = np.zeros(Neta) + thet[5 * np.arange(1, Neta5 + 1) - 3] = x1[0] - 1.5 * dx + dx * np.arange(Neta5) + thet[5 * np.arange(1, Neta5 + 1) - 5] = ( + x1[0] - 1.5 * dx + dx * np.arange(Neta5) - dx * g3 + ) + thet[5 * np.arange(1, Neta5 + 1) - 4] = ( + x1[0] - 1.5 * dx + dx * np.arange(Neta5) - dx * g2 + ) + thet[5 * np.arange(1, Neta5 + 1) - 2] = ( + x1[0] - 1.5 * dx + dx * np.arange(Neta5) + dx * g2 + ) + thet[5 * np.arange(1, Neta5 + 1) - 1] = ( + x1[0] - 1.5 * dx + dx * np.arange(Neta5) + dx * g3 + ) + return thet + + def _set_zz(self, thet: np.ndarray, a: float): + dxi = 2 * np.pi * a / self.grid.N + if self.grid.hlocalCF is not None: + self.zz = np.exp(1j * dxi * self.grid.hlocalCF(thet)) + else: + raise ValueError("The method hlocalCF should be initialized.") + return + + def _set_thet(self): + self.thet = self.zz + return + + def _update_thet(self): + self.thet = self.thet * self.zz + return + + def _build_psi_col(self, sig: np.ndarray, Neta: int, j: int): + thet = self.thet + psi_j = [] + + for i in range(10): + psi_ji = list( + sig[i] + * (thet[i : Neta - (19 - i) : 5] + thet[(19 - i) : (Neta - i) : 5]) + ) + psi_j.append(psi_ji) + + psi_j = np.array(psi_j).sum(axis=0) + self.psi[j, :] = psi_j + + def _PSI_computation( + self, contract: Optional[float] = None, a: Optional[float] = None + ): + if contract is None: + self._set_left_and_NMM() + else: + self._set_left_and_NMM(contract=contract) + + if a is not None: + dx = 1 / a + else: + raise ValueError("a should be a real number.") + NNM = self.NNM # Number of columns of PSI + Neta = 5 * NNM + 15 # Sample size + Neta5 = NNM + 3 + + self.thet = self._thet_computation( + dx=dx, x1=np.array([self.leftGridPoint]), Neta=Neta, Neta5=Neta5 + ) + sig = self._sig_computation() + self._set_zz(self.thet, a) + self._set_thet() + + # PSI Matrix: 5-Point GAUSSIAN + self.psi = np.zeros( + (self.grid.N - 1, NNM), dtype=np.float64 + ) # The first row will remain ones + self.psi = self.psi.astype(np.complex128) + + for j in range(self.grid.N - 1): + self._build_psi_col(sig, Neta, j) + self._update_thet() + return + + def _recursion(self): + self.phi_y = self.phi_y.transpose(1, 0, 2) + for _ in range(2, self.grid.M + 1): + self.phi_y_new = np.einsum("ab,bca->ac", self.phi_y_new, self.phi_y) + return + + @abstractmethod + def _set_left_and_NMM(self, *args, **kwargs: Any): + raise NotImplementedError + + @abstractmethod + def get_PhiY(self): + raise NotImplementedError + + +class NonRecursivePriceGeneric: + def __init__( + self, + mat: ExponentialMat, + toep: Toeplitz, + grid: GridParamsGeneric, + num_params: NumericalParams, + ): + self.mat = mat + if not isinstance(self.mat.model, TYPES.Hes_base): + raise NotImplementedError + self.toep = toep + self.grid = grid + self.num_params = num_params + self.beta = self._get_beta() + self._init_variables() + + def _get_beta(self) -> np.ndarray: + exp_mat = self.mat.get_exp_tensor() + beta = self.toep.get_beta(exp_mat).transpose((1, 0, 2)) + return beta + + def recursive_price(self) -> float: + self._thet = self._init_thet() + self.cont = self._create_cont() + for _ in np.arange(0, self.grid.M - 1)[::-1]: + self._time_recursion() + price = self._interp_price() + return price + + @abstractmethod + def _init_thet(self): + raise NotImplementedError + + def _time_recursion(self) -> None: + for j in range(self.num_params.m0): + self._update_thet(j) + self._update_cont() + return + + @abstractmethod + def _update_thet(self, j: int) -> None: + raise NotImplementedError + + ######################## + ###### CONT Array ###### + ######################## + + def _update_cont(self) -> None: + self.cont = np.zeros((self.grid.K, self.num_params.m0)) + for k in range(self.num_params.m0): + stacked = np.hstack((self._thet[: self.grid.K, k], np.zeros(self.grid.K))) + thet_temp = np.fft.fft(stacked) + for j in range(self.num_params.m0): + p = np.real(np.fft.ifft(self.beta[j, k, :] * thet_temp)) + self.cont[:, j] += p[: self.grid.K] + + # different from ameerican + def _create_cont(self) -> np.ndarray: + cont = np.zeros((self.grid.K, self.num_params.m0)) + vec_thet_update = self._vec_for_thet_update() + stacked = np.hstack((vec_thet_update, np.zeros(self.grid.K))) + thet_temp = np.fft.fft(stacked) + for j in range(self.num_params.m0): + for k in range(self.num_params.m0): + p = np.real(np.fft.ifft(self.beta[j, k, :] * thet_temp)) + cont[:, j] += p[: self.grid.K] + return cont + + @abstractmethod + def _vec_for_thet_update(self): + raise NotImplementedError + + ######################### + ##### Interp Price ###### + ######################### + def _get_k0(self) -> int: + k0 = 2 + v = self.mat.get_v() + if isinstance(self.mat.model, TYPES.Hes_base): + while self.mat.model.v_0 > v[k0 - 1] and k0 < self.num_params.m0: + k0 += 1 + k0 -= 1 + return k0 + + else: + raise NotImplementedError + + def _interp_price(self) -> float: + k0 = self._get_k0() + v = self.mat.get_v() + val1, val2 = self._get_val(k0) + if isinstance(self.mat.model, TYPES.Hes_base): + price = val1 + (val2 - val1) * (self.mat.model.v_0 - v[k0 - 1]) / ( + v[k0] - v[k0 - 1] + ) + else: + raise NotImplementedError + return price + + @abstractmethod + def _get_val(self, k0: int): + raise NotImplementedError + + @abstractmethod + def _init_variables(self): + raise NotImplementedError diff --git a/fypy/pricing/fourier/StochVol/__init__.py b/fypy/pricing/fourier/StochVol/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index 4e04d64..1609ac0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,5 @@ py_lets_be_rational==1.0.1 requests_cache==1.1.0 scipy==1.11.1 torch==2.0.1 -yfinance==0.2.26 \ No newline at end of file +yfinance==0.2.26 +tqdm==4.66.1 \ No newline at end of file diff --git a/tests/numerical_values_for_testing/AsianStoPrices.mat b/tests/numerical_values_for_testing/AsianStoPrices.mat new file mode 100644 index 0000000..4dad87a Binary files /dev/null and b/tests/numerical_values_for_testing/AsianStoPrices.mat differ diff --git a/tests/numerical_values_for_testing/BarrierStoPrices.mat b/tests/numerical_values_for_testing/BarrierStoPrices.mat new file mode 100644 index 0000000..58949ff Binary files /dev/null and b/tests/numerical_values_for_testing/BarrierStoPrices.mat differ diff --git a/tests/numerical_values_for_testing/BermudanStoPrices.mat b/tests/numerical_values_for_testing/BermudanStoPrices.mat new file mode 100644 index 0000000..55ca392 Binary files /dev/null and b/tests/numerical_values_for_testing/BermudanStoPrices.mat differ diff --git a/tests/numerical_values_for_testing/CliquetStoPrices.mat b/tests/numerical_values_for_testing/CliquetStoPrices.mat new file mode 100644 index 0000000..c5a1b2e Binary files /dev/null and b/tests/numerical_values_for_testing/CliquetStoPrices.mat differ diff --git a/tests/numerical_values_for_testing/DVSwapStoPrices.mat b/tests/numerical_values_for_testing/DVSwapStoPrices.mat new file mode 100644 index 0000000..4c75144 Binary files /dev/null and b/tests/numerical_values_for_testing/DVSwapStoPrices.mat differ diff --git a/tests/pricing/fourier/Test_GilPeleaz_European.py b/tests/pricing/fourier/Test_GilPeleaz_European.py index 21465af..d70c5a5 100644 --- a/tests/pricing/fourier/Test_GilPeleaz_European.py +++ b/tests/pricing/fourier/Test_GilPeleaz_European.py @@ -31,4 +31,4 @@ def test_price_black_scholes(self): if __name__ == '__main__': - unittest.main() + unittest.main() \ No newline at end of file diff --git a/tests/pricing/fourier/Test_Proj_Asian.py b/tests/pricing/fourier/Test_Proj_Asian.py index bfb52c5..4b41e5e 100644 --- a/tests/pricing/fourier/Test_Proj_Asian.py +++ b/tests/pricing/fourier/Test_Proj_Asian.py @@ -70,4 +70,4 @@ def test_arithmetic_asian_bilateral_gamma_motion(self): np.where(K == k)[0][0], np.where(T == t)[0][0], np.where(M == m)[0][0]], 10) if __name__ == '__main__': - unittest.main() + unittest.main() \ No newline at end of file diff --git a/tests/pricing/fourier/Test_Proj_Barrier.py b/tests/pricing/fourier/Test_Proj_Barrier.py index 3c16901..34cf633 100644 --- a/tests/pricing/fourier/Test_Proj_Barrier.py +++ b/tests/pricing/fourier/Test_Proj_Barrier.py @@ -1,85 +1,85 @@ -import os -import unittest - -from scipy.io import loadmat as loadmat - -from fypy.model.levy.BilateralGamma import BilateralGammaMotion -from fypy.model.levy.BlackScholes import * -from fypy.pricing.fourier.ProjBarrierPricer import ProjBarrierPricer -from fypy.termstructures.DiscountCurve import DiscountCurve_ConstRate -from fypy.termstructures.EquityForward import EquityForward - - -class Test_Proj_Barrier(unittest.TestCase): - def test_barrier_bilateral_gamma_motion(self): - - # N.B. Regarding alphas computation, check _TODO_ comment in ProjBarrier - - - # Load of MATLAB results - # Get the absolute path to the directory of the current script - script_dir = os.path.dirname(os.path.abspath(__file__)) - - # Construct the absolute path to the .mat file - file_path = os.path.join(script_dir, '..', '..', 'numerical_values_for_testing', 'prices_barrier_mat.mat') - - # Load the .mat file - matlab_prices = loadmat(file_path)['prices'] - - # Model and Pricer creation - S0 = 100 - r = 0.05 - q = 0.02 - N = 2 ** 14 - disc_curve = DiscountCurve_ConstRate(rate=r) - div_disc = DiscountCurve_ConstRate(rate=q) - fwd = EquityForward(S0=S0, discount=disc_curve, divDiscount=div_disc) - model = BilateralGammaMotion(forwardCurve=fwd, discountCurve=disc_curve, alpha_p=1.3737 * 10 ** (-11), - lambda_p=90.6317, alhpa_m=0.4331, lambda_m=2.4510, sigma=0.2725) - pricer = ProjBarrierPricer(model=model, N=N) - - T = np.asarray([0.1, 0.7, 1, 1.9]) # Time (in years) - K = np.arange(80, 201, 2) # Strike - M = np.asarray([40, 65]) # number of discrete monitoring points - H = 70 # barrier - rebate = 5 - is_calls = np.empty(len(K)) - is_calls.fill(True) - - # Testing multi-strike method - for t in T: - for m in M: - prices = pricer.price_strikes(T=t, - M=m, - H=H, - down=True, - rebate=rebate, - K=K, - is_calls=is_calls) - for k in range(len(K)): - self.assertAlmostEqual(prices[k], matlab_prices[ - np.where(T == t)[0][0], k, np.where(M == m)[0][0]], 6) - - # Testing single-strike method, selecting central elements - - inner_T = [T[len(T) // 2 - 1], T[len(T) // 2]] if len(T) % 2 == 0 else [T[len(T) // 2]] - inner_K = [K[len(K) // 2 - 1], K[len(K) // 2]] if len(K) % 2 == 0 else [K[len(K) // 2]] - inner_M = [M[len(M) // 2 - 1], M[len(M) // 2]] if len(M) % 2 == 0 else [M[len(M) // 2]] - - for t in inner_T: - for k in inner_K: - for m in inner_M: - single_price = pricer.price(T=t, - M=m, - H=H, - down=True, - rebate=rebate, - K=k, - is_call=is_calls[0]) - - self.assertAlmostEqual(single_price, matlab_prices[ - np.where(T == t)[0][0], np.where(K == k)[0][0], np.where(M == m)[0][0]], 6) - - -if __name__ == '__main__': - unittest.main() +import os +import unittest + +from scipy.io import loadmat as loadmat + +from fypy.model.levy.BilateralGamma import BilateralGammaMotion +from fypy.model.levy.BlackScholes import * +from fypy.pricing.fourier.ProjBarrierPricer import ProjBarrierPricer +from fypy.termstructures.DiscountCurve import DiscountCurve_ConstRate +from fypy.termstructures.EquityForward import EquityForward + + +class Test_Proj_Barrier(unittest.TestCase): + def test_barrier_bilateral_gamma_motion(self): + + # N.B. Regarding alphas computation, check _TODO_ comment in ProjBarrier + + + # Load of MATLAB results + # Get the absolute path to the directory of the current script + script_dir = os.path.dirname(os.path.abspath(__file__)) + + # Construct the absolute path to the .mat file + file_path = os.path.join(script_dir, '..', '..', 'numerical_values_for_testing', 'prices_barrier_mat.mat') + + # Load the .mat file + matlab_prices = loadmat(file_path)['prices'] + + # Model and Pricer creation + S0 = 100 + r = 0.05 + q = 0.02 + N = 2 ** 14 + disc_curve = DiscountCurve_ConstRate(rate=r) + div_disc = DiscountCurve_ConstRate(rate=q) + fwd = EquityForward(S0=S0, discount=disc_curve, divDiscount=div_disc) + model = BilateralGammaMotion(forwardCurve=fwd, discountCurve=disc_curve, alpha_p=1.3737 * 10 ** (-11), + lambda_p=90.6317, alhpa_m=0.4331, lambda_m=2.4510, sigma=0.2725) + pricer = ProjBarrierPricer(model=model, N=N) + + T = np.asarray([0.1, 0.7, 1, 1.9]) # Time (in years) + K = np.arange(80, 201, 2) # Strike + M = np.asarray([40, 65]) # number of discrete monitoring points + H = 70 # barrier + rebate = 5 + is_calls = np.empty(len(K)) + is_calls.fill(True) + + # Testing multi-strike method + for t in T: + for m in M: + prices = pricer.price_strikes(T=t, + M=m, + H=H, + down=True, + rebate=rebate, + K=K, + is_calls=is_calls) + for k in range(len(K)): + self.assertAlmostEqual(prices[k], matlab_prices[ + np.where(T == t)[0][0], k, np.where(M == m)[0][0]], 6) + + # Testing single-strike method, selecting central elements + + inner_T = [T[len(T) // 2 - 1], T[len(T) // 2]] if len(T) % 2 == 0 else [T[len(T) // 2]] + inner_K = [K[len(K) // 2 - 1], K[len(K) // 2]] if len(K) % 2 == 0 else [K[len(K) // 2]] + inner_M = [M[len(M) // 2 - 1], M[len(M) // 2]] if len(M) % 2 == 0 else [M[len(M) // 2]] + + for t in inner_T: + for k in inner_K: + for m in inner_M: + single_price = pricer.price(T=t, + M=m, + H=H, + down=True, + rebate=rebate, + K=k, + is_call=is_calls[0]) + + self.assertAlmostEqual(single_price, matlab_prices[ + np.where(T == t)[0][0], np.where(K == k)[0][0], np.where(M == m)[0][0]], 6) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/pricing/fourier/Test_Proj_Sto_Asian.py b/tests/pricing/fourier/Test_Proj_Sto_Asian.py new file mode 100644 index 0000000..a3354cf --- /dev/null +++ b/tests/pricing/fourier/Test_Proj_Sto_Asian.py @@ -0,0 +1,36 @@ +import unittest + +from fypy.pricing.fourier.StochVol.ProjAsianPricer_SV import ProjAsianPricer_SV +from Test_StochVol_Helper import GenericTest + + +class Test_Proj_Asian(unittest.TestCase, GenericTest): + def __init__(self, methodName="runTest"): + GenericTest.__init__(self, option_name="Asian") + unittest.TestCase.__init__(self, methodName) + + def check_equality(self, price, matlab_price): + self.assertAlmostEqual(price, matlab_price, 7) + + def _get_price(self, list_index): + idx_W, idx_T, idx_M = list_index + price = self.pricer.price( + T=self.matlab.T[idx_T], + M=self.matlab.M[idx_M], + W=self.matlab.W[idx_W], + S0=self.mkt.S0, + is_call=True, + ) + return price + + def _set_option_constants(self): + return + + def _set_pricer(self): + self.pricer = ProjAsianPricer_SV( + model=self.model, P=self.pricer_params.P, Pbar=self.pricer_params.Pbar + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/pricing/fourier/Test_Proj_Sto_Barrier.py b/tests/pricing/fourier/Test_Proj_Sto_Barrier.py new file mode 100644 index 0000000..8b9c61d --- /dev/null +++ b/tests/pricing/fourier/Test_Proj_Sto_Barrier.py @@ -0,0 +1,37 @@ +import unittest + +from fypy.pricing.fourier.StochVol.ProjBarrierPricer_SV import ProjBarrierPricer_SV +from Test_StochVol_Helper import GenericTest + + +class Test_Proj_Barrier(unittest.TestCase, GenericTest): + def __init__(self, methodName="runTest"): + unittest.TestCase.__init__(self, methodName) + GenericTest.__init__(self, option_name="Barrier") + + def check_equality(self, price, matlab_price): + self.assertAlmostEqual(price, matlab_price, 7) + + def _get_price(self, list_index): + idx_W, idx_T, idx_M, idx_H = list_index + price = self.pricer.price( + T=self.matlab.T[idx_T], + M=self.matlab.M[idx_M], + W=self.matlab.W[idx_W], + S0=self.mkt.S0, + H=self.matlab.H[idx_H], + down=True, + is_call=True, + ) + return price + + def _set_option_constants(self): + self.contract = 1 + return + + def _set_pricer(self): + self.pricer = ProjBarrierPricer_SV(model=self.model, N=self.pricer_params.N) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/pricing/fourier/Test_Proj_Sto_Bermudan.py b/tests/pricing/fourier/Test_Proj_Sto_Bermudan.py new file mode 100644 index 0000000..7018527 --- /dev/null +++ b/tests/pricing/fourier/Test_Proj_Sto_Bermudan.py @@ -0,0 +1,35 @@ +import unittest + +from fypy.pricing.fourier.StochVol.ProjBermudanPricer_SV import ProjBermudanPricer_SV +from Test_StochVol_Helper import GenericTest + + +class Test_Proj_Bermudan(unittest.TestCase, GenericTest): + def __init__(self, methodName="runTest"): + unittest.TestCase.__init__(self, methodName) + GenericTest.__init__(self, option_name="Bermudan") + + def check_equality(self, price, matlab_price): + self.assertAlmostEqual(price, matlab_price, 7) + + def _get_price(self, list_index): + idx_W, idx_T, idx_M = list_index + price = self.pricer.price( + T=self.matlab.T[idx_T], + M=self.matlab.M[idx_M], + W=self.matlab.W[idx_W], + S0=self.mkt.S0, + is_call=False, + ) + return price + + def _set_option_constants(self): + self.contract = 1 + return + + def _set_pricer(self): + self.pricer = ProjBermudanPricer_SV(model=self.model, N=self.pricer_params.N) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/pricing/fourier/Test_Proj_Sto_Cliquet.py b/tests/pricing/fourier/Test_Proj_Sto_Cliquet.py new file mode 100644 index 0000000..d78e893 --- /dev/null +++ b/tests/pricing/fourier/Test_Proj_Sto_Cliquet.py @@ -0,0 +1,43 @@ +import unittest + +from fypy.pricing.fourier.StochVol.ProjCliquetPricer_SV import ProjCliquetPricer_SV +from Test_StochVol_Helper import GenericTest + + +class Test_Proj_Cliquet(unittest.TestCase, GenericTest): + def __init__(self, methodName="runTest"): + unittest.TestCase.__init__(self, methodName) + GenericTest.__init__(self, option_name="Cliquet") + + def check_equality(self, price, matlab_price): + self.assertAlmostEqual(price, matlab_price, 7) + + def _get_price(self, list_index): + idx_W, idx_T, idx_M = list_index + CG = 0.9 * self.matlab.M[idx_M] * self.C + price = self.pricer.price( + T=self.matlab.T[idx_T], + M=self.matlab.M[idx_M], + W=self.matlab.W[idx_W], + S0=self.mkt.S0, + C=self.C, + CG=CG, + F=self.F, + FG=self.FG, + contract=self.contract, + ) + return price + + def _set_option_constants(self): + self.C = 0.04 + self.F = 0 + self.FG = 0 + self.contract = 3 + return + + def _set_pricer(self): + self.pricer = ProjCliquetPricer_SV(model=self.model, N=self.pricer_params.N) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/pricing/fourier/Test_Proj_Sto_DVSwap.py b/tests/pricing/fourier/Test_Proj_Sto_DVSwap.py new file mode 100644 index 0000000..3706668 --- /dev/null +++ b/tests/pricing/fourier/Test_Proj_Sto_DVSwap.py @@ -0,0 +1,34 @@ +import unittest + +from fypy.pricing.fourier.StochVol.ProjDVSwap_SV import ProjDVSwap_SV +from Test_StochVol_Helper import GenericTest + + +class Test_Proj_DVSwap(unittest.TestCase, GenericTest): + def __init__(self, methodName="runTest"): + unittest.TestCase.__init__(self, methodName) + GenericTest.__init__(self, option_name="DVSwap") + + def check_equality(self, price, matlab_price): + self.assertAlmostEqual(price, matlab_price, 7) + + def _get_price(self, list_index): + idx_W, idx_T, idx_M = list_index + price = self.pricer.price( + T=self.matlab.T[idx_T], + M=self.matlab.M[idx_M], + W=self.matlab.W[idx_W], + contract=self.contract, + ) + return price + + def _set_option_constants(self): + self.contract = 1 + return + + def _set_pricer(self): + self.pricer = ProjDVSwap_SV(model=self.model, N=self.pricer_params.N, L=14) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/pricing/fourier/Test_StochVol_Helper.py b/tests/pricing/fourier/Test_StochVol_Helper.py new file mode 100644 index 0000000..ff4cd40 --- /dev/null +++ b/tests/pricing/fourier/Test_StochVol_Helper.py @@ -0,0 +1,295 @@ +import os +from scipy.io import loadmat as loadmat +import numpy as np +import pandas as pd +from dataclasses import dataclass +from abc import abstractmethod + +from tqdm import tqdm +from itertools import product + +from fypy.termstructures.DiscountCurve import DiscountCurve_ConstRate +from fypy.termstructures.EquityForward import EquityForward +from fypy.model.sv.HestonDEJumps import HestonDEJumps + + +# market parameters (always the same) +@dataclass +class MarketParams: + S0: float = 100 + r: float = 0.05 + q: float = 0 + + +class PricerParams: + def __init__(self, use_P: bool = False): + if use_P: + self.P: int = 5 + self.Pbar: int = 3 + else: + self.N = 2**10 + + +# Storing curves +class Curves: + def __init__(self, mkt: MarketParams): + self.disc_curve = DiscountCurve_ConstRate(rate=mkt.r) + self.div_disc = DiscountCurve_ConstRate(rate=mkt.q) + self.fwd = EquityForward( + S0=mkt.S0, discount=self.disc_curve, divDiscount=self.div_disc + ) + + +# Matlab data +class Matlab: + def __init__(self, option_name: str, barrier: bool = False): + self.script_dir: str = os.path.dirname(os.path.abspath(__file__)) + + file_path = os.path.join( + self.script_dir, + "..", + "..", + "numerical_values_for_testing", + f"{option_name}StoPrices.mat", + ) + + file = loadmat(file_path) + self.prices = file["prices"] + self.W = file["lin_W"].flatten() + self.T = file["lin_T"].flatten() + self.M = file["lin_M"].flatten() + if barrier: + self.H = file["lin_H"].flatten() + self.params = file["params"] + + +# Write in the shell +class Printer: + def __init__(self): + self._colors_code = { + "PURPLE": "\033[95m", + "CYAN": "\033[96m", + "DARKCYAN": "\033[36m", + "BLUE": "\033[94m", + "GREEN": "\033[92m", + "YELLOW": "\033[93m", + "RED": "\033[91m", + "BOLD": "\033[1m", + "UNDERLINE": "\033[4m", + "END": "\033[0m", + "ITALIC": "\u001b[3m", + } + + # write in the shell + def write( + self, text: str, color: str = "END", style: list[str] = [], indent: int = 0 + ): + style_code = "".join([self._colors_code[st] for st in style]) + print( + self._colors_code[color] + + style_code + + indent * "\t" + + text + + self._colors_code["END"] + ) + + # display title + def write_begin(self, option_name: str): + color = "DARKCYAN" + msg = " Test " + option_name + " Stochastic " + hashtag_number = 10 + len_hashtag = len(msg) + 2 * hashtag_number + self.write("\n\n") + self.write(len_hashtag * "#", color=color, indent=3) + self.write( + hashtag_number * "#" + msg + hashtag_number * "#", color=color, indent=3 + ) + self.write(len_hashtag * "#" + "\n\n\n", color=color, indent=3) + + # write the parameters in the shell + def write_parameters( + self, idx_param: int, number_param_set: int, params: np.ndarray + ): + self.write( + f"(*) Parameters set n°{idx_param}/{number_param_set-1}: {list(params)}\n", + indent=1, + style=["BOLD"], + ) + + # if sucess, green message + def write_success(self): + self.write( + "-> Parameters set successfully passed the test. \n", + color="GREEN", + style=["ITALIC"], + indent=4, + ) + + def write_conclusion(self): + self.write( + "Recapitulative DataFrame: \n", + style=["BOLD", "UNDERLINE"], + color="DARKCYAN", + ) + + +# Numeric error logger +class ErrorLog: + def __init__(self, number_param_set: int): + self.number_param_set = number_param_set + self.errors = {i: [] for i in range(number_param_set)} + self.nan_number = {i: 0 for i in range(number_param_set)} + self.option_number = {i: 0 for i in range(number_param_set)} + + # add the error to the list + def add_error(self, param_set_idx: int, error: float): + self.errors[param_set_idx].append(error) + + # count the nan + def add_nan(self, param_set_idx: int): + self.nan_number[param_set_idx] += 1 + + # Maximal error for a whole parameter set + def get_error_max_idx(self, param_set_idx: int): + return np.max(np.abs(self.errors[param_set_idx])) + + # list of the numerical errors when testing a parameter set + def get_error_list(self): + error_list = [] + for i in self.errors: + error_list.append(self.get_error_max_idx(i)) + return error_list + + # build the recap table of the testing procedure + def build_recap_df(self): + error_list = self.get_error_list() + data = { + "Parameters set index": list(range(self.number_param_set)), + "Max Error": error_list, + "NaN Number": list(self.nan_number.values()), + "Option Number": list(self.option_number.values()), + } + df = pd.DataFrame.from_dict(data) + return df + + +class GenericTest: + + def __init__(self, option_name: str): + self.option_name = option_name + + # test function + def test_psv(self): + self._initialization() + for idx_param in range(len(self.matlab.params)): + self._update_pricer(self.matlab.params, idx_param) + self._count_option(idx_param) + self._test_idx_param(idx_param) + self.printer.write_success() + self.display_conclusion() + + # test a set of params + def _test_idx_param(self, idx_param): + + for tuple_index in tqdm(self.product): + list_index = list(tuple_index) + price = self._try_price(idx_param, list_index) + self._check_error_and_type(idx_param, list_index, price) + + # specific option constant such as contract type, floor, etc... + @abstractmethod + def _set_option_constants(self): + raise NotImplementedError + + # compute the price of the option + @abstractmethod + def _get_price(self, list_index): + raise NotImplementedError + + # define a pricer + @abstractmethod + def _set_pricer(self): + raise NotImplementedError + + # count the number of option + def _count_option(self, idx_param: int): + self.error_log.option_number[idx_param] = np.prod( + self.matlab.prices[idx_param].shape + ) + + # try if a price can be computed, toherwize NaN + def _try_price(self, idx_param, list_index): + try: + price = self._get_price(list_index) + except: + price = np.nan + self.error_log.add_nan(idx_param) + return price + + # set the model, here HDKE + def _set_model(self): + self.model = HestonDEJumps(self.curves.fwd, self.curves.disc_curve) + + # display the log error/NaN + def display_conclusion(self): + self.printer.write_conclusion() + df = self.error_log.build_recap_df() + print(df) + + # update the parameters of the pricer when changing the set + def _update_pricer(self, params: np.ndarray, idx_param: int): + param = params[idx_param] + self.printer.write_parameters(idx_param, len(params), param) + self.model.set_params(param) + self._set_pricer() + return + + # initialization of all classes and constants + def _initialization(self): + use_P = self.option_name == "Asian" + is_barrier = self.option_name == "Barrier" + + self.printer = Printer() + self.printer.write_begin(self.option_name) + self.mkt = MarketParams() + self.pricer_params = PricerParams(use_P=use_P) + self.curves = Curves(self.mkt) + self.matlab = Matlab(self.option_name, is_barrier) + self._set_product() + self._set_option_constants() + self.error_log = ErrorLog(len(self.matlab.params)) + self._set_model() + + # check the consistency between Python and Matlab types + def _check_error_and_type(self, idx_param, list_index, price): + index = tuple([idx_param] + list_index) + if np.isnan(price) != np.isnan(self.matlab.prices[index]): + raise ValueError("Python output type should be consistent with Matlab.") + if not np.isnan(price): + self.check_equality(price, self.matlab.prices[index]) + error = price - self.matlab.prices[index] + self.error_log.add_error(idx_param, error) + + def _set_product(self): + match self.option_name: + case "Barrier": + self.product = list( + product( + range(len(self.matlab.W)), + range(len(self.matlab.T)), + range(len(self.matlab.M)), + range(len(self.matlab.H)), + ) + ) + case _: + self.product = list( + product( + range(len(self.matlab.W)), + range(len(self.matlab.T)), + range(len(self.matlab.M)), + ) + ) + + @abstractmethod + def check_equality(self, price: float, matlab_price: float): + raise NotImplementedError diff --git a/tests/pricing/fourier/suite.py b/tests/pricing/fourier/suite.py index 8316f7a..a625d22 100644 --- a/tests/pricing/fourier/suite.py +++ b/tests/pricing/fourier/suite.py @@ -13,4 +13,4 @@ def test_suite(): if __name__ == '__main__': - unittest.TextTestRunner(verbosity=2).run(test_suite()) + unittest.TextTestRunner(verbosity=2).run(test_suite()) \ No newline at end of file