diff --git a/simba/mixins/statistics_mixin.py b/simba/mixins/statistics_mixin.py index b07601b2f..a5f4ed4e5 100644 --- a/simba/mixins/statistics_mixin.py +++ b/simba/mixins/statistics_mixin.py @@ -1,13 +1,16 @@ __author__ = "Simon Nilsson" +from typing import Optional, Tuple, Union + from sklearn.neighbors import LocalOutlierFactor -from typing import Optional, Union, Tuple + try: from typing import Literal except: from typing_extensions import Literal + import numpy as np -from numba import njit, jit, prange, typed, optional, objmode, types +from numba import jit, njit, objmode, optional, prange, typed, types from scipy import stats from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin @@ -33,12 +36,9 @@ class Statistics(FeatureExtractionMixin): def __init__(self): FeatureExtractionMixin.__init__(self) - @staticmethod @jit(nopython=True) - def _hist_1d(data: np.ndarray, - bin_count: int, - range: np.ndarray): + def _hist_1d(data: np.ndarray, bin_count: int, range: np.ndarray): """ Jitted helper to compute 1D histograms with counts. @@ -54,10 +54,10 @@ def _hist_1d(data: np.ndarray, return hist @staticmethod - @njit('(float64[:], float64, float64)', cache=True) - def rolling_independent_sample_t(data: np.ndarray, - time_window: float, - fps: float) -> np.ndarray: + @njit("(float64[:], float64, float64)", cache=True) + def rolling_independent_sample_t( + data: np.ndarray, time_window: float, fps: float + ) -> np.ndarray: """ Jitted compute independent-sample t-statistics for sequentially binned values in a time-series. E.g., compute t-test statistics when comparing ``Feature N`` in the current 1s @@ -86,18 +86,30 @@ def rolling_independent_sample_t(data: np.ndarray, window_size = int(time_window * fps) data = np.split(data, list(range(window_size, data.shape[0], window_size))) for cnt, i in enumerate(prange(1, len(data))): - start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) - mean_1, mean_2 = np.mean(data[i-1]), np.mean(data[i]) - stdev_1, stdev_2 = np.std(data[i-1]), np.std(data[i]) - pooled_std = np.sqrt(((len(data[i-1]) - 1) * stdev_1 ** 2 + (len(data[i]) - 1) * stdev_2 ** 2) / (len(data[i-1]) + len(data[i]) - 2)) - results[start:end] = (mean_1 - mean_2) / (pooled_std * np.sqrt(1 / len(data[i-1]) + 1 / len(data[i]))) + start, end = int((cnt + 1) * window_size), int( + ((cnt + 1) * window_size) + window_size + ) + mean_1, mean_2 = np.mean(data[i - 1]), np.mean(data[i]) + stdev_1, stdev_2 = np.std(data[i - 1]), np.std(data[i]) + pooled_std = np.sqrt( + ( + (len(data[i - 1]) - 1) * stdev_1**2 + + (len(data[i]) - 1) * stdev_2**2 + ) + / (len(data[i - 1]) + len(data[i]) - 2) + ) + results[start:end] = (mean_1 - mean_2) / ( + pooled_std * np.sqrt(1 / len(data[i - 1]) + 1 / len(data[i])) + ) return results @staticmethod @jit(nopython=True) - def independent_samples_t(sample_1: np.ndarray, - sample_2: np.ndarray, - critical_values: Optional[np.ndarray] = None) -> (float, Union[None, bool]): + def independent_samples_t( + sample_1: np.ndarray, + sample_2: np.ndarray, + critical_values: Optional[np.ndarray] = None, + ) -> (float, Union[None, bool]): """ Jitted compute independent-samples t-test statistic and boolean significance between two distributions. @@ -123,11 +135,18 @@ def independent_samples_t(sample_1: np.ndarray, m1, m2 = np.mean(sample_1), np.mean(sample_2) std_1 = np.sqrt(np.sum((sample_1 - m1) ** 2) / (len(sample_1) - 1)) std_2 = np.sqrt(np.sum((sample_2 - m2) ** 2) / (len(sample_2) - 1)) - pooled_std = np.sqrt(((len(sample_1) - 1) * std_1 ** 2 + (len(sample_2) - 1) * std_2 ** 2) / (len(sample_1) + len(sample_2) - 2)) - t_statistic = (m1 - m2) / (pooled_std * np.sqrt(1 / len(sample_1) + 1 / len(sample_2))) + pooled_std = np.sqrt( + ((len(sample_1) - 1) * std_1**2 + (len(sample_2) - 1) * std_2**2) + / (len(sample_1) + len(sample_2) - 2) + ) + t_statistic = (m1 - m2) / ( + pooled_std * np.sqrt(1 / len(sample_1) + 1 / len(sample_2)) + ) if critical_values is not None: dof = (sample_1.shape[0] + sample_2.shape[0]) - 2 - critical_value = np.interp(dof, critical_values[:, 0], critical_values[:, 1]) + critical_value = np.interp( + dof, critical_values[:, 0], critical_values[:, 1] + ) if critical_value < abs(t_statistic): significance_bool = True else: @@ -135,11 +154,9 @@ def independent_samples_t(sample_1: np.ndarray, return t_statistic, significance_bool - @staticmethod - @njit('(float64[:], float64[:])', cache=True) - def cohens_d(sample_1: np.ndarray, - sample_2: np.ndarray) -> float: + @njit("(float64[:], float64[:])", cache=True) + def cohens_d(sample_1: np.ndarray, sample_2: np.ndarray) -> float: """ Jitted compute of Cohen's d between two distributions @@ -153,14 +170,15 @@ def cohens_d(sample_1: np.ndarray, >>> Statistics().cohens_d(sample_1=sample_1, sample_2=sample_2) >>> -0.5952099775170546 """ - return (np.mean(sample_1) - np.mean(sample_2)) / (np.sqrt((np.std(sample_1) ** 2 + np.std(sample_2) ** 2) / 2)) + return (np.mean(sample_1) - np.mean(sample_2)) / ( + np.sqrt((np.std(sample_1) ** 2 + np.std(sample_2) ** 2) / 2) + ) @staticmethod - @njit('(float64[:], float64[:], float64)', cache=True) - def rolling_cohens_d(data: np.ndarray, - time_windows: np.ndarray, - fps: float) -> np.ndarray: - + @njit("(float64[:], float64[:], float64)", cache=True) + def rolling_cohens_d( + data: np.ndarray, time_windows: np.ndarray, fps: float + ) -> np.ndarray: """ Jitted compute of rolling Cohen's D statistic comparing the current time-window of size N to the preceding window of size N. @@ -180,21 +198,26 @@ def rolling_cohens_d(data: np.ndarray, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) + data_split = np.split( + data, list(range(window_size, data.shape[0], window_size)) + ) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) - d = (np.mean(sample_1) - np.mean(sample_2)) / (np.sqrt((np.std(sample_1) ** 2 + np.std(sample_2) ** 2) / 2)) - results[window_start: window_end, i] = d + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ + j + ].astype(np.float32) + d = (np.mean(sample_1) - np.mean(sample_2)) / ( + np.sqrt((np.std(sample_1) ** 2 + np.std(sample_2) ** 2) / 2) + ) + results[window_start:window_end, i] = d return results - @staticmethod - @njit('(float32[:], float64, float64)') - def rolling_two_sample_ks(data: np.ndarray, - time_window: float, - fps: float) -> np.ndarray: + @njit("(float32[:], float64, float64)") + def rolling_two_sample_ks( + data: np.ndarray, time_window: float, fps: float + ) -> np.ndarray: """ Jitted compute Kolmogorov two-sample statistics for sequentially binned values in a time-series. E.g., compute KS statistics when comparing ``Feature N`` in the current 1s time-window, versus ``Feature N`` in the previous 1s time-window. @@ -212,29 +235,42 @@ def rolling_two_sample_ks(data: np.ndarray, window_size, results = int(time_window * fps), np.full((data.shape[0]), -1.0) data = np.split(data, list(range(window_size, data.shape[0], window_size))) for cnt, i in enumerate(prange(1, len(data))): - start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) - sample_1, sample_2 = data[i-1], data[i] + start, end = int((cnt + 1) * window_size), int( + ((cnt + 1) * window_size) + window_size + ) + sample_1, sample_2 = data[i - 1], data[i] combined_samples = np.sort(np.concatenate((sample_1, sample_2))) - ecdf_sample_1 = np.searchsorted(sample_1, combined_samples, side='right') / len(sample_1) - ecdf_sample_2 = np.searchsorted(sample_2, combined_samples, side='right') / len(sample_2) + ecdf_sample_1 = np.searchsorted( + sample_1, combined_samples, side="right" + ) / len(sample_1) + ecdf_sample_2 = np.searchsorted( + sample_2, combined_samples, side="right" + ) / len(sample_2) ks = np.max(np.abs(ecdf_sample_1 - ecdf_sample_2)) results[start:end] = ks return results @staticmethod @jit(nopython=True) - def two_sample_ks(sample_1: np.ndarray, - sample_2: np.ndarray, - critical_values: Optional[bool] = None) -> (float, Union[bool, None]): - + def two_sample_ks( + sample_1: np.ndarray, + sample_2: np.ndarray, + critical_values: Optional[bool] = None, + ) -> (float, Union[bool, None]): significance_bool = None combined_samples = np.sort(np.concatenate((sample_1, sample_2))) - ecdf_sample_1 = np.searchsorted(sample_1, combined_samples, side='right') / len(sample_1) - ecdf_sample_2 = np.searchsorted(sample_2, combined_samples, side='right') / len(sample_2) + ecdf_sample_1 = np.searchsorted(sample_1, combined_samples, side="right") / len( + sample_1 + ) + ecdf_sample_2 = np.searchsorted(sample_2, combined_samples, side="right") / len( + sample_2 + ) ks = np.max(np.abs(ecdf_sample_1 - ecdf_sample_2)) if critical_values is not None: combined_sample_size = len(sample_1) + len(sample_2) - critical_value = np.interp(combined_sample_size, critical_values[:, 0], critical_values[:, 1]) + critical_value = np.interp( + combined_sample_size, critical_values[:, 0], critical_values[:, 1] + ) if critical_value < abs(ks): significance_bool = True else: @@ -244,9 +280,11 @@ def two_sample_ks(sample_1: np.ndarray, @staticmethod @jit(nopython=True) - def one_way_anova(sample_1: np.ndarray, - sample_2: np.ndarray, - critical_values: Optional[np.ndarray] = None) -> (float, float): + def one_way_anova( + sample_1: np.ndarray, + sample_2: np.ndarray, + critical_values: Optional[np.ndarray] = None, + ) -> (float, float): """ Jitted compute of one-way ANOVA F statistics and associated p-value for two distributions. @@ -263,15 +301,19 @@ def one_way_anova(sample_1: np.ndarray, significance_bool = True n1, n2 = len(sample_1), len(sample_2) m1, m2 = np.mean(sample_1), np.mean(sample_2) - ss_between = n1 * (m1 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + n2 * ( - m2 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + ss_between = ( + n1 * (m1 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + + n2 * (m2 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + ) ss_within = np.sum((sample_1 - m1) ** 2) + np.sum((sample_2 - m2) ** 2) df_between, df_within = 1, n1 + n2 - 2 ms_between, ms_within = ss_between / df_between, ss_within / df_within f = ms_between / ms_within if critical_values is not None: critical_values = critical_values[:, np.array([0, df_between])] - critical_value = np.interp(df_within, critical_values[:, 0], critical_values[:, 1]) + critical_value = np.interp( + df_within, critical_values[:, 0], critical_values[:, 1] + ) if f > critical_value: significance_bool = True else: @@ -280,11 +322,10 @@ def one_way_anova(sample_1: np.ndarray, return (f, significance_bool) @staticmethod - @njit('(float32[:], float64[:], float64)', cache=True) - def rolling_one_way_anova(data: np.ndarray, - time_windows: np.ndarray, - fps: int) -> np.ndarray: - + @njit("(float32[:], float64[:], float64)", cache=True) + def rolling_one_way_anova( + data: np.ndarray, time_windows: np.ndarray, fps: int + ) -> np.ndarray: """ Jitted compute of rolling one-way ANOVA F-statistic comparing the current time-window of size N to the preceding window of size N. @@ -306,28 +347,38 @@ def rolling_one_way_anova(data: np.ndarray, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) + data_split = np.split( + data, list(range(window_size, data.shape[0], window_size)) + ) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ + j + ].astype(np.float32) n1, n2 = len(sample_1), len(sample_2) m1, m2 = np.mean(sample_1), np.mean(sample_2) - ss_between = n1 * (m1 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + n2 * (m2 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + ss_between = ( + n1 * (m1 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + + n2 * (m2 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + ) ss_within = np.sum((sample_1 - m1) ** 2) + np.sum((sample_2 - m2) ** 2) df_between, df_within = 1, n1 + n2 - 2 ms_between, ms_within = ss_between / df_between, ss_within / df_within f = ms_between / ms_within - results[window_start: window_end, i] = f + results[window_start:window_end, i] = f return results - def kullback_leibler_divergence(self, - sample_1: np.ndarray, - sample_2: np.ndarray, - fill_value: int = 1, - bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> float: - + def kullback_leibler_divergence( + self, + sample_1: np.ndarray, + sample_2: np.ndarray, + fill_value: int = 1, + bucket_method: Literal[ + "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" + ] = "auto", + ) -> float: """ Compute Kullback-Leibler divergence between two distributions. @@ -341,19 +392,33 @@ def kullback_leibler_divergence(self, """ bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_1_hist[sample_1_hist == 0] = fill_value; sample_2_hist[sample_2_hist == 0] = fill_value - sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d( + data=sample_1, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_2_hist = self._hist_1d( + data=sample_2, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_1_hist[sample_1_hist == 0] = fill_value + sample_2_hist[sample_2_hist == 0] = fill_value + sample_1_hist, sample_2_hist = sample_1_hist / np.sum( + sample_1_hist + ), sample_2_hist / np.sum(sample_2_hist) return stats.entropy(pk=sample_1_hist, qk=sample_2_hist) - def rolling_kullback_leibler_divergence(self, - data: np.ndarray, - time_windows: np.ndarray, - fps: int, - fill_value: int = 1, - bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> np.ndarray: - + def rolling_kullback_leibler_divergence( + self, + data: np.ndarray, + time_windows: np.ndarray, + fps: int, + fill_value: int = 1, + bucket_method: Literal[ + "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" + ] = "auto", + ) -> np.ndarray: """ Compute rolling Kullback-Leibler divergence comparing the current time-window of size N to the preceding window of size N. @@ -376,25 +441,43 @@ def rolling_kullback_leibler_divergence(self, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) + data_split = np.split( + data, list(range(window_size, data.shape[0], window_size)) + ) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ + j + ].astype(np.float32) bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_1_hist[sample_1_hist == 0] = fill_value; sample_2_hist[sample_2_hist == 0] = fill_value - sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d( + data=sample_1, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_2_hist = self._hist_1d( + data=sample_2, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_1_hist[sample_1_hist == 0] = fill_value + sample_2_hist[sample_2_hist == 0] = fill_value + sample_1_hist, sample_2_hist = sample_1_hist / np.sum( + sample_1_hist + ), sample_2_hist / np.sum(sample_2_hist) kl = stats.entropy(pk=sample_1_hist, qk=sample_2_hist) - results[window_start: window_end, i] = kl + results[window_start:window_end, i] = kl return results - def jensen_shannon_divergence(self, - sample_1: np.ndarray, - sample_2: np.ndarray, - bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> float: - + def jensen_shannon_divergence( + self, + sample_1: np.ndarray, + sample_2: np.ndarray, + bucket_method: Literal[ + "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" + ] = "auto", + ) -> float: """ Compute Jensen-Shannon divergence between two distributions. Useful for (i) measure drift in datasets, and (ii) featurization of distribution shifts across @@ -412,17 +495,31 @@ def jensen_shannon_divergence(self, """ bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_1_hist = self._hist_1d( + data=sample_1, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_2_hist = self._hist_1d( + data=sample_2, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) mean_hist = np.mean([sample_1_hist, sample_2_hist], axis=0) - kl_sample_1, kl_sample_2 = stats.entropy(pk=sample_1_hist, qk=mean_hist), stats.entropy(pk=sample_2_hist, qk=mean_hist) + kl_sample_1, kl_sample_2 = stats.entropy( + pk=sample_1_hist, qk=mean_hist + ), stats.entropy(pk=sample_2_hist, qk=mean_hist) return (kl_sample_1 + kl_sample_2) / 2 - def rolling_jensen_shannon_divergence(self, - data: np.ndarray, - time_windows: np.ndarray, - fps = int, - bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> np.ndarray: + def rolling_jensen_shannon_divergence( + self, + data: np.ndarray, + time_windows: np.ndarray, + fps=int, + bucket_method: Literal[ + "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" + ] = "auto", + ) -> np.ndarray: """ Compute rolling Jensen-Shannon divergence comparing the current time-window of size N to the preceding window of size N. @@ -436,27 +533,45 @@ def rolling_jensen_shannon_divergence(self, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) + data_split = np.split( + data, list(range(window_size, data.shape[0], window_size)) + ) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ + j + ].astype(np.float32) bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d( + data=sample_1, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_2_hist = self._hist_1d( + data=sample_2, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_1_hist, sample_2_hist = sample_1_hist / np.sum( + sample_1_hist + ), sample_2_hist / np.sum(sample_2_hist) mean_hist = np.mean([sample_1_hist, sample_2_hist], axis=0) - kl_sample_1, kl_sample_2 = stats.entropy(pk=sample_1_hist, qk=mean_hist), stats.entropy(pk=sample_2_hist, qk=mean_hist) + kl_sample_1, kl_sample_2 = stats.entropy( + pk=sample_1_hist, qk=mean_hist + ), stats.entropy(pk=sample_2_hist, qk=mean_hist) js = (kl_sample_1 + kl_sample_2) / 2 - results[window_start: window_end, i] = js + results[window_start:window_end, i] = js return results - - def wasserstein_distance(self, - sample_1: np.ndarray, - sample_2: np.ndarray, - bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> float: - + def wasserstein_distance( + self, + sample_1: np.ndarray, + sample_2: np.ndarray, + bucket_method: Literal[ + "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" + ] = "auto", + ) -> float: """ Compute Wasserstein distance between two distributions. @@ -478,18 +593,32 @@ def wasserstein_distance(self, """ bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) - return stats.wasserstein_distance(u_values=sample_1_hist, v_values=sample_2_hist) - - - def rolling_wasserstein_distance(self, - data: np.ndarray, - time_windows: np.ndarray, - fps: int, - bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> np.ndarray: - + sample_1_hist = self._hist_1d( + data=sample_1, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_2_hist = self._hist_1d( + data=sample_2, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_1_hist, sample_2_hist = sample_1_hist / np.sum( + sample_1_hist + ), sample_2_hist / np.sum(sample_2_hist) + return stats.wasserstein_distance( + u_values=sample_1_hist, v_values=sample_2_hist + ) + + def rolling_wasserstein_distance( + self, + data: np.ndarray, + time_windows: np.ndarray, + fps: int, + bucket_method: Literal[ + "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" + ] = "auto", + ) -> np.ndarray: """ Compute rolling Wasserstein distance comparing the current time-window of size N to the preceding window of size N. @@ -504,25 +633,45 @@ def rolling_wasserstein_distance(self, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) + data_split = np.split( + data, list(range(window_size, data.shape[0], window_size)) + ) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ + j + ].astype(np.float32) bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) - w = stats.wasserstein_distance(u_values=sample_1_hist, v_values=sample_2_hist) - results[window_start: window_end, i] = w + sample_1_hist = self._hist_1d( + data=sample_1, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_2_hist = self._hist_1d( + data=sample_2, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_1_hist, sample_2_hist = sample_1_hist / np.sum( + sample_1_hist + ), sample_2_hist / np.sum(sample_2_hist) + w = stats.wasserstein_distance( + u_values=sample_1_hist, v_values=sample_2_hist + ) + results[window_start:window_end, i] = w return results - def population_stability_index(self, - sample_1: np.ndarray, - sample_2: np.ndarray, - fill_value: int = 1, - bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> float: + def population_stability_index( + self, + sample_1: np.ndarray, + sample_2: np.ndarray, + fill_value: int = 1, + bucket_method: Literal[ + "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" + ] = "auto", + ) -> float: """ Compute Population Stability Index (PSI) comparing two distributions. @@ -538,20 +687,35 @@ def population_stability_index(self, """ bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_1_hist[sample_1_hist == 0] = fill_value; sample_2_hist[sample_2_hist == 0] = fill_value - sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d( + data=sample_1, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_2_hist = self._hist_1d( + data=sample_2, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_1_hist[sample_1_hist == 0] = fill_value + sample_2_hist[sample_2_hist == 0] = fill_value + sample_1_hist, sample_2_hist = sample_1_hist / np.sum( + sample_1_hist + ), sample_2_hist / np.sum(sample_2_hist) samples_diff = sample_2_hist - sample_1_hist log = np.log(sample_2_hist / sample_1_hist) return np.sum(samples_diff * log) - def rolling_population_stability_index(self, - data: np.ndarray, - time_windows: np.ndarray, - fps: int, - fill_value: int = 1, - bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> np.ndarray: + def rolling_population_stability_index( + self, + data: np.ndarray, + time_windows: np.ndarray, + fps: int, + fill_value: int = 1, + bucket_method: Literal[ + "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" + ] = "auto", + ) -> np.ndarray: """ Compute rolling Population Stability Index (PSI) comparing the current time-window of size N to the preceding window of size N. @@ -569,29 +733,41 @@ def rolling_population_stability_index(self, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) + data_split = np.split( + data, list(range(window_size, data.shape[0], window_size)) + ) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[j].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ + j + ].astype(np.float32) bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - sample_1_hist[sample_1_hist == 0] = fill_value; sample_2_hist[sample_2_hist == 0] = fill_value - sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d( + data=sample_1, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_2_hist = self._hist_1d( + data=sample_2, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + sample_1_hist[sample_1_hist == 0] = fill_value + sample_2_hist[sample_2_hist == 0] = fill_value + sample_1_hist, sample_2_hist = sample_1_hist / np.sum( + sample_1_hist + ), sample_2_hist / np.sum(sample_2_hist) samples_diff = sample_2_hist - sample_1_hist log = np.log(sample_2_hist / sample_1_hist) psi = np.sum(samples_diff * log) - results[window_start: window_end, i] = psi + results[window_start:window_end, i] = psi return results - @staticmethod - @njit('(float64[:], float64[:])', cache=True) - def kruskal_wallis(sample_1: np.ndarray, - sample_2: np.ndarray) -> float: - + @njit("(float64[:], float64[:])", cache=True) + def kruskal_wallis(sample_1: np.ndarray, sample_2: np.ndarray) -> float: """ Jitted compute of Kruskal-Wallis H between two distributions. @@ -605,23 +781,23 @@ def kruskal_wallis(sample_1: np.ndarray, >>> results = Statistics().kruskal_wallis(sample_1=sample_1, sample_2=sample_2) """ - #sample_1 = np.concatenate((np.zeros((sample_1.shape[0], 1)), sample_1.reshape(-1, 1)), axis=1) - #sample_2 = np.concatenate((np.ones((sample_2.shape[0], 1)), sample_2.reshape(-1, 1)), axis=1) + # sample_1 = np.concatenate((np.zeros((sample_1.shape[0], 1)), sample_1.reshape(-1, 1)), axis=1) + # sample_2 = np.concatenate((np.ones((sample_2.shape[0], 1)), sample_2.reshape(-1, 1)), axis=1) data = np.vstack((sample_1, sample_2)) ranks = fast_mean_rank(data=data[:, 1], descending=False) data = np.hstack((data, ranks.reshape(-1, 1))) - sample_1_summed_rank = np.sum(data[0: sample_1.shape[0], 2].flatten()) - sample_2_summed_rank = np.sum(data[sample_1.shape[0]:, 2].flatten()) + sample_1_summed_rank = np.sum(data[0 : sample_1.shape[0], 2].flatten()) + sample_2_summed_rank = np.sum(data[sample_1.shape[0] :, 2].flatten()) h1 = 12 / (data.shape[0] * (data.shape[0] + 1)) - h2 = (np.square(sample_1_summed_rank) / sample_1.shape[0]) + (np.square(sample_2_summed_rank) / sample_2.shape[0]) + h2 = (np.square(sample_1_summed_rank) / sample_1.shape[0]) + ( + np.square(sample_2_summed_rank) / sample_2.shape[0] + ) h3 = 3 * (data.shape[0] + 1) - return h1 * h2 - h3 + return h1 * h2 - h3 @staticmethod - @njit('(float64[:], float64[:])', cache=True) - def mann_whitney(sample_1: np.ndarray, - sample_2: np.ndarray) -> float: - + @njit("(float64[:], float64[:])", cache=True) + def mann_whitney(sample_1: np.ndarray, sample_2: np.ndarray) -> float: """ Jitted compute of Mann-Whitney U between two distributions. @@ -644,13 +820,11 @@ def mann_whitney(sample_1: np.ndarray, u2 = n1 * n2 - u1 return min(u1, u2) - @staticmethod @jit(nopython=True, cache=True) - def levenes(sample_1: np.ndarray, - sample_2: np.ndarray, - critical_values: np.ndarray) -> (float, Union[bool, None]): - + def levenes( + sample_1: np.ndarray, sample_2: np.ndarray, critical_values: np.ndarray + ) -> (float, Union[bool, None]): """ Jitted compute of two-sample Leven's W. @@ -675,10 +849,14 @@ def levenes(sample_1: np.ndarray, Ni_x, Ni_y = len(sample_1), len(sample_2) Yci_x, Yci_y = np.median(sample_1), np.median(sample_2) Ntot = Ni_x + Ni_y - Zij_x, Zij_y = np.abs(sample_1 - Yci_x).astype(np.float32), np.abs(sample_2 - Yci_y).astype(np.float32) + Zij_x, Zij_y = np.abs(sample_1 - Yci_x).astype(np.float32), np.abs( + sample_2 - Yci_y + ).astype(np.float32) Zbari_x, Zbari_y = np.mean(Zij_x), np.mean(Zij_y) Zbar = ((Zbari_x * Ni_x) + (Zbari_y * Ni_y)) / Ntot - numer = (Ntot - 2) * np.sum(np.array([Ni_x, Ni_y]) * (np.array([Zbari_x, Zbari_y]) - Zbar) ** 2) + numer = (Ntot - 2) * np.sum( + np.array([Ni_x, Ni_y]) * (np.array([Zbari_x, Zbari_y]) - Zbar) ** 2 + ) dvar = np.sum((Zij_x - Zbari_x) ** 2) + np.sum((Zij_y - Zbari_y) ** 2) denom = (2 - 1.0) * dvar l_statistic = numer / denom @@ -687,7 +865,9 @@ def levenes(sample_1: np.ndarray, dfn, dfd = 1, (Ni_x + Ni_y) - 2 idx = (np.abs(critical_values[0][1:] - dfd)).argmin() + 1 critical_values = critical_values[1:, np.array([0, idx])] - critical_value = np.interp(dfd, critical_values[:, 0], critical_values[:, 1]) + critical_value = np.interp( + dfd, critical_values[:, 0], critical_values[:, 1] + ) if l_statistic >= critical_value: significance_bool = True else: @@ -696,11 +876,8 @@ def levenes(sample_1: np.ndarray, return (l_statistic, significance_bool) @staticmethod - @njit('(float64[:], float64[:], float64)', cache=True) - def rolling_levenes(data: np.ndarray, - time_windows: np.ndarray, - fps: float): - + @njit("(float64[:], float64[:], float64)", cache=True) + def rolling_levenes(data: np.ndarray, time_windows: np.ndarray, fps: float): """ Jitted compute of rolling Levene's W comparing the current time-window of size N to the preceding window of size N. @@ -719,29 +896,35 @@ def rolling_levenes(data: np.ndarray, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) + data_split = np.split( + data, list(range(window_size, data.shape[0], window_size)) + ) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[j].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ + j + ].astype(np.float32) Ni_x, Ni_y = len(sample_1), len(sample_2) Yci_x, Yci_y = np.median(sample_1), np.median(sample_2) Ntot = Ni_x + Ni_y - Zij_x, Zij_y = np.abs(sample_1 - Yci_x).astype(np.float32), np.abs(sample_2 - Yci_y).astype(np.float32) + Zij_x, Zij_y = np.abs(sample_1 - Yci_x).astype(np.float32), np.abs( + sample_2 - Yci_y + ).astype(np.float32) Zbari_x, Zbari_y = np.mean(Zij_x), np.mean(Zij_y) Zbar = ((Zbari_x * Ni_x) + (Zbari_y * Ni_y)) / Ntot - numer = (Ntot - 2) * np.sum(np.array([Ni_x, Ni_y]) * (np.array([Zbari_x, Zbari_y]) - Zbar) ** 2) + numer = (Ntot - 2) * np.sum( + np.array([Ni_x, Ni_y]) * (np.array([Zbari_x, Zbari_y]) - Zbar) ** 2 + ) dvar = np.sum((Zij_x - Zbari_x) ** 2) + np.sum((Zij_y - Zbari_y) ** 2) denom = (2 - 1.0) * dvar w = numer / denom - results[window_start: window_end, i] = w + results[window_start:window_end, i] = w return results - @staticmethod @jit(nopython=True, cache=True) - def brunner_munzel(sample_1: np.ndarray, - sample_2: np.ndarray) -> float: + def brunner_munzel(sample_1: np.ndarray, sample_2: np.ndarray) -> float: """ Jitted compute of Brunner-Munzel W between two distributions. @@ -759,7 +942,7 @@ def brunner_munzel(sample_1: np.ndarray, """ nx, ny = len(sample_1), len(sample_2) rankc = fast_mean_rank(np.concatenate((sample_1, sample_2))) - rankcx, rankcy = rankc[0:nx], rankc[nx:nx + ny] + rankcx, rankcy = rankc[0:nx], rankc[nx : nx + ny] rankcx_mean, rankcy_mean = np.mean(rankcx), np.mean(rankcy) rankx, ranky = fast_mean_rank(sample_1), fast_mean_rank(sample_2) rankx_mean, ranky_mean = np.mean(rankx), np.mean(ranky) @@ -769,38 +952,42 @@ def brunner_munzel(sample_1: np.ndarray, wbfn /= (nx + ny) * np.sqrt(nx * Sx + ny * Sy) return -wbfn - @staticmethod - @njit('(float32[:], float64[:], float64)', cache=True) - def rolling_barletts_test(data: np.ndarray, - time_windows: np.ndarray, - fps: float): - + @njit("(float32[:], float64[:], float64)", cache=True) + def rolling_barletts_test(data: np.ndarray, time_windows: np.ndarray, fps: float): results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) + data_split = np.split( + data, list(range(window_size, data.shape[0], window_size)) + ) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[j].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ + j + ].astype(np.float32) n_1 = len(sample_1) n_2 = len(sample_2) N = n_1 + n_2 - mean_variance_1 = np.sum((sample_1 - np.mean(sample_1)) ** 2) / (n_1 - 1) - mean_variance_2 = np.sum((sample_2 - np.mean(sample_2)) ** 2) / (n_2 - 1) - numerator = (N - 2) * (np.log(mean_variance_1) + np.log(mean_variance_2)) + mean_variance_1 = np.sum((sample_1 - np.mean(sample_1)) ** 2) / ( + n_1 - 1 + ) + mean_variance_2 = np.sum((sample_2 - np.mean(sample_2)) ** 2) / ( + n_2 - 1 + ) + numerator = (N - 2) * ( + np.log(mean_variance_1) + np.log(mean_variance_2) + ) denominator = 1 / (n_1 - 1) + 1 / (n_2 - 1) u = numerator / denominator - results[window_start: window_end, i] = u + results[window_start:window_end, i] = u return results @staticmethod - @njit('(float32[:], float32[:])') - def pearsons_r(sample_1: np.ndarray, - sample_2: np.ndarray): - + @njit("(float32[:], float32[:])") + def pearsons_r(sample_1: np.ndarray, sample_2: np.ndarray): """ :example: >>> sample_1 = np.array([7, 2, 9, 4, 5, 6, 7, 8, 9]).astype(np.float32) @@ -810,15 +997,15 @@ def pearsons_r(sample_1: np.ndarray, m1, m2 = np.mean(sample_1), np.mean(sample_2) numerator = np.sum((sample_1 - m1) * (sample_2 - m2)) - denominator = np.sqrt(np.sum((sample_1 - m1) ** 2) * np.sum((sample_2 - m2) ** 2)) + denominator = np.sqrt( + np.sum((sample_1 - m1) ** 2) * np.sum((sample_2 - m2) ** 2) + ) r = numerator / denominator return r @staticmethod - @njit('(float32[:], float32[:])') - def spearman_rank_correlation(sample_1: np.ndarray, - sample_2: np.ndarray): - + @njit("(float32[:], float32[:])") + def spearman_rank_correlation(sample_1: np.ndarray, sample_2: np.ndarray): """ :example: >>> sample_1 = np.array([7, 2, 9, 4, 5, 6, 7, 8, 9]).astype(np.float32) @@ -827,17 +1014,17 @@ def spearman_rank_correlation(sample_1: np.ndarray, >>> 0.0003979206085205078 """ - rank_x, rank_y = np.argsort(np.argsort(sample_1)), np.argsort(np.argsort(sample_2)) + rank_x, rank_y = np.argsort(np.argsort(sample_1)), np.argsort( + np.argsort(sample_2) + ) d_squared = np.sum((rank_x - rank_y) ** 2) return 1 - (6 * d_squared) / (len(sample_1) * (len(sample_2) ** 2 - 1)) - @staticmethod - @njit('(float32[:], float32[:], float64[:], int64)') - def sliding_pearsons_r(sample_1: np.ndarray, - sample_2: np.ndarray, - time_windows: np.ndarray, - fps: int) -> np.ndarray: + @njit("(float32[:], float32[:], float64[:], int64)") + def sliding_pearsons_r( + sample_1: np.ndarray, sample_2: np.ndarray, time_windows: np.ndarray, fps: int + ) -> np.ndarray: """ Given two 1D arrays of size N, create sliding window of size time_windows[i] * fps and return Pearson's R between the values in the two 1D arrays in each window. Address "what is the correlation between Feature 1 and @@ -860,18 +1047,20 @@ def sliding_pearsons_r(sample_1: np.ndarray, >>> [[-1.][-1.][-1.][-1.][0.227][-0.319][-0.196][0.474][-0.061][0.713]] """ - results = np.full((sample_1.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - for left, right in zip(prange(0, sample_1.shape[0]+1), prange(window_size, sample_1.shape[0]+1)): + for left, right in zip( + prange(0, sample_1.shape[0] + 1), + prange(window_size, sample_1.shape[0] + 1), + ): s1, s2 = sample_1[left:right], sample_2[left:right] m1, m2 = np.mean(s1), np.mean(s2) numerator = np.sum((s1 - m1) * (s2 - m2)) denominator = np.sqrt(np.sum((s1 - m1) ** 2) * np.sum((s2 - m2) ** 2)) if denominator != 0: r = numerator / denominator - results[right-1, i] = r + results[right - 1, i] = r else: results[right - 1, i] = -1.0 @@ -879,11 +1068,12 @@ def sliding_pearsons_r(sample_1: np.ndarray, @staticmethod @jit(nopython=True) - def chi_square(sample_1: np.ndarray, - sample_2: np.ndarray, - critical_values: Optional[np.ndarray] = None, - type: Optional[str] = 'goodness_of_fit') -> Tuple[float, Union[bool, None]]: - + def chi_square( + sample_1: np.ndarray, + sample_2: np.ndarray, + critical_values: Optional[np.ndarray] = None, + type: Optional[str] = "goodness_of_fit", + ) -> Tuple[float, Union[bool, None]]: """ Jitted compute of chi square between two categorical distributions. @@ -909,8 +1099,9 @@ def chi_square(sample_1: np.ndarray, sample_2_counts = np.zeros(len(unique_categories), dtype=np.int64) for i in prange(len(unique_categories)): - sample_1_counts[i], sample_2_counts[i] = np.sum(sample_1 == unique_categories[i]), np.sum( - sample_2 == unique_categories[i]) + sample_1_counts[i], sample_2_counts[i] = np.sum( + sample_1 == unique_categories[i] + ), np.sum(sample_2 == unique_categories[i]) for i in prange(len(unique_categories)): count_1, count_2 = sample_1_counts[i], sample_2_counts[i] @@ -920,7 +1111,7 @@ def chi_square(sample_1: np.ndarray, chi_square += ((count_1 - count_2) ** 2) / (count_2 + 1) if critical_values is not None: - if type == 'goodness_of_fit': + if type == "goodness_of_fit": df = unique_categories.shape[0] - 1 else: df = (len(sample_1_counts) - 1) * (len(sample_2_counts) - 1) @@ -933,14 +1124,15 @@ def chi_square(sample_1: np.ndarray, return chi_square, significance_bool - @staticmethod - @njit('(float32[:], float32, float32, float32[:,:], float32)') - def sliding_independent_samples_t(data: np.ndarray, - time_window: float, - slide_time: float, - critical_values: np.ndarray, - fps: float) -> np.ndarray: + @njit("(float32[:], float32, float32, float32[:,:], float32)") + def sliding_independent_samples_t( + data: np.ndarray, + time_window: float, + slide_time: float, + critical_values: np.ndarray, + fps: float, + ) -> np.ndarray: """ Jitted compute of sliding independent sample t-test. Compares the feature values in current time-window to prior time-windows to find the length in time to the most recent time-window where a significantly different @@ -967,33 +1159,38 @@ def sliding_independent_samples_t(data: np.ndarray, results = np.full((data.shape[0]), 0.0) window_size, slide_size = int(time_window * fps), int(slide_time * fps) for i in range(1, data.shape[0]): - sample_1_left, sample_1_right = i, i+window_size - sample_2_left, sample_2_right = sample_1_left-slide_size, sample_1_right-slide_size + sample_1_left, sample_1_right = i, i + window_size + sample_2_left, sample_2_right = ( + sample_1_left - slide_size, + sample_1_right - slide_size, + ) sample_1 = data[sample_1_left:sample_1_right] dof, steps_taken = (sample_1.shape[0] + sample_1.shape[0]) - 2, 1 while sample_2_left >= 0: sample_2 = data[sample_2_left:sample_2_right] - t_statistic = (np.mean(sample_1) - np.mean(sample_2)) / np.sqrt((np.std(sample_1) / sample_1.shape[0]) + (np.std(sample_2) / sample_1.shape[0])) - critical_val = critical_values[dof-1][1] + t_statistic = (np.mean(sample_1) - np.mean(sample_2)) / np.sqrt( + (np.std(sample_1) / sample_1.shape[0]) + + (np.std(sample_2) / sample_1.shape[0]) + ) + critical_val = critical_values[dof - 1][1] if t_statistic >= critical_val: break else: - sample_2_left -= 1; sample_2_right -= 1; steps_taken += 1 + sample_2_left -= 1 + sample_2_right -= 1 + steps_taken += 1 if sample_2_left < 0: steps_taken = -1 if steps_taken == -1: results[i + window_size] = -1 else: - results[i+window_size] = steps_taken * slide_time + results[i + window_size] = steps_taken * slide_time return results @staticmethod - @njit('(float32[:], float64[:], float32)') - def rolling_mann_whitney(data: np.ndarray, - time_windows: np.ndarray, - fps: float): - + @njit("(float32[:], float64[:], float32)") + def rolling_mann_whitney(data: np.ndarray, time_windows: np.ndarray, fps: float): """ Jitted compute of rolling Mann-Whitney U comparing the current time-window of size N to the preceding window of size N. @@ -1015,36 +1212,35 @@ def rolling_mann_whitney(data: np.ndarray, results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) + data_split = np.split( + data, list(range(window_size, data.shape[0], window_size)) + ) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[j].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ + j + ].astype(np.float32) n1, n2 = sample_1.shape[0], sample_2.shape[0] ranked = fast_mean_rank(np.concatenate((sample_1, sample_2))) u1 = n1 * n2 + (n1 * (n1 + 1)) / 2.0 - np.sum(ranked[:n1], axis=0) u2 = n1 * n2 - u1 u = min(u1, u2) - results[window_start: window_end, i] = u + results[window_start:window_end, i] = u return results - def chow_test(self): pass def adf_test(self): pass - @staticmethod - @njit('(float32[:], float32[:], float64[:], int64)') - def sliding_spearman_rank_correlation(sample_1: np.ndarray, - sample_2: np.ndarray, - time_windows: np.ndarray, - fps: int) -> np.ndarray: - - + @njit("(float32[:], float32[:], float64[:], int64)") + def sliding_spearman_rank_correlation( + sample_1: np.ndarray, sample_2: np.ndarray, time_windows: np.ndarray, fps: int + ) -> np.ndarray: """ Given two 1D arrays of size N, create sliding window of size time_windows[i] * fps and return Spearman's rank correlation between the values in the two 1D arrays in each window. Address "what is the correlation between Feature 1 and @@ -1070,22 +1266,23 @@ def sliding_spearman_rank_correlation(sample_1: np.ndarray, results = np.full((sample_1.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - for left, right in zip(prange(0, sample_1.shape[0] + 1), prange(window_size, sample_1.shape[0] + 1)): + for left, right in zip( + prange(0, sample_1.shape[0] + 1), + prange(window_size, sample_1.shape[0] + 1), + ): s1, s2 = sample_1[left:right], sample_2[left:right] rank_x, rank_y = np.argsort(np.argsort(s1)), np.argsort(np.argsort(s2)) d_squared = np.sum((rank_x - rank_y) ** 2) s = 1 - (6 * d_squared) / (len(s1) * (len(s2) ** 2 - 1)) - results[right-1, i] = s + results[right - 1, i] = s return results - @staticmethod - @njit('(float32[:], float64, float64, float64)') - def sliding_autocorrelation(data: np.ndarray, - max_lag: float, - time_window: float, - fps: float): + @njit("(float32[:], float64, float64, float64)") + def sliding_autocorrelation( + data: np.ndarray, max_lag: float, time_window: float, fps: float + ): """ Jitted compute of sliding auto-correlations (the correlation of a feature with itself using lagged windows). @@ -1097,9 +1294,9 @@ def sliding_autocorrelation(data: np.ndarray, max_frm_lag, time_window_frms = int(max_lag * fps), int(time_window * fps) results = np.full((data.shape[0]), 0.0) - for right in prange(time_window_frms-1, data.shape[0]): - left = right - time_window_frms+1 - w_data = data[left:right+1] + for right in prange(time_window_frms - 1, data.shape[0]): + left = right - time_window_frms + 1 + w_data = data[left : right + 1] corrcfs = np.full((max_frm_lag), np.nan) corrcfs[0] = 1 for shift in range(1, max_frm_lag): @@ -1108,63 +1305,65 @@ def sliding_autocorrelation(data: np.ndarray, const = np.ones_like(corrcfs) mat_[:, 0] = const mat_[:, 1] = corrcfs - det_ = np.linalg.lstsq(mat_.astype(np.float32), np.arange(0, max_frm_lag).astype(np.float32))[0] + det_ = np.linalg.lstsq( + mat_.astype(np.float32), np.arange(0, max_frm_lag).astype(np.float32) + )[0] results[right] = det_[::-1][0] return results @staticmethod - def dominant_frequencies(data: np.ndarray, - fps: float, - k: int, - window_function: Literal['Hann', 'Hamming', 'Blackman'] = None): - - """ Find the K dominant frequencies within a feature vector """ - - if window_function == 'Hann': + def dominant_frequencies( + data: np.ndarray, + fps: float, + k: int, + window_function: Literal["Hann", "Hamming", "Blackman"] = None, + ): + """Find the K dominant frequencies within a feature vector""" + + if window_function == "Hann": data = data * np.hanning(len(data)) - elif window_function == 'Hamming': + elif window_function == "Hamming": data = data * np.hamming(len(data)) - elif window_function == 'Blackman': + elif window_function == "Blackman": data = data * np.blackman(len(data)) fft_result = np.fft.fft(data) frequencies = np.fft.fftfreq(data.shape[0], 1 / fps) magnitude = np.abs(fft_result) - return frequencies[np.argsort(magnitude)[-(k + 1):-1]] + return frequencies[np.argsort(magnitude)[-(k + 1) : -1]] @staticmethod - def sliding_dominant_frequencies(data: np.ndarray, - fps: float, - k: int, - time_windows: np.ndarray, - window_function: Literal['Hann', 'Hamming', 'Blackman'] = None): - - """ Find the K dominant frequencies within a feature vector using sliding windows """ - + def sliding_dominant_frequencies( + data: np.ndarray, + fps: float, + k: int, + time_windows: np.ndarray, + window_function: Literal["Hann", "Hamming", "Blackman"] = None, + ): + """Find the K dominant frequencies within a feature vector using sliding windows""" results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for time_window_cnt in range(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) - for left, right in zip(range(0, data.shape[0] + 1), range(window_size, data.shape[0] + 1)): + for left, right in zip( + range(0, data.shape[0] + 1), range(window_size, data.shape[0] + 1) + ): window_data = data[left:right] - if window_function == 'Hann': + if window_function == "Hann": window_data = window_data * np.hanning(len(window_data)) - elif window_function == 'Hamming': + elif window_function == "Hamming": window_data = window_data * np.hamming(len(window_data)) - elif window_function == 'Blackman': + elif window_function == "Blackman": window_data = window_data * np.blackman(len(window_data)) fft_result = np.fft.fft(window_data) frequencies = np.fft.fftfreq(window_data.shape[0], 1 / fps) magnitude = np.abs(fft_result) - top_k_frequency = frequencies[np.argsort(magnitude)[-(k + 1):-1]] - results[right-1][time_window_cnt] = top_k_frequency[0] + top_k_frequency = frequencies[np.argsort(magnitude)[-(k + 1) : -1]] + results[right - 1][time_window_cnt] = top_k_frequency[0] return results - @staticmethod - @njit('(float32[:], float32[:])') - def kendall_tau(sample_1: np.ndarray, - sample_2: np.ndarray) -> Tuple[float, float]: - + @njit("(float32[:], float32[:])") + def kendall_tau(sample_1: np.ndarray, sample_2: np.ndarray) -> Tuple[float, float]: """ Jitted Kendall Tau (rank correlation coefficient). Non-parametric method for computing correlation between two time-series features. Returns tau and associated z-score. @@ -1186,43 +1385,66 @@ def kendall_tau(sample_1: np.ndarray, rnks = np.argsort(sample_1) s1_rnk, s2_rnk = sample_1[rnks], sample_2[rnks] - cncrdnt_cnts, dscrdnt_cnts = np.full((s1_rnk.shape[0] - 1), np.nan), np.full((s1_rnk.shape[0] - 1), np.nan) + cncrdnt_cnts, dscrdnt_cnts = np.full((s1_rnk.shape[0] - 1), np.nan), np.full( + (s1_rnk.shape[0] - 1), np.nan + ) for i in range(s2_rnk.shape[0] - 1): - cncrdnt_cnts[i] = np.argwhere(s2_rnk[i + 1:] > s2_rnk[i]).flatten().shape[0] - dscrdnt_cnts[i] = np.argwhere(s2_rnk[i + 1:] < s2_rnk[i]).flatten().shape[0] - t = (np.sum(cncrdnt_cnts) - np.sum(dscrdnt_cnts)) / (np.sum(cncrdnt_cnts) + np.sum(dscrdnt_cnts)) - z = 3 * t * (np.sqrt(s1_rnk.shape[0] * (s1_rnk.shape[0] - 1))) / np.sqrt(2 * ((2 * s1_rnk.shape[0]) + 5)) + cncrdnt_cnts[i] = ( + np.argwhere(s2_rnk[i + 1 :] > s2_rnk[i]).flatten().shape[0] + ) + dscrdnt_cnts[i] = ( + np.argwhere(s2_rnk[i + 1 :] < s2_rnk[i]).flatten().shape[0] + ) + t = (np.sum(cncrdnt_cnts) - np.sum(dscrdnt_cnts)) / ( + np.sum(cncrdnt_cnts) + np.sum(dscrdnt_cnts) + ) + z = ( + 3 + * t + * (np.sqrt(s1_rnk.shape[0] * (s1_rnk.shape[0] - 1))) + / np.sqrt(2 * ((2 * s1_rnk.shape[0]) + 5)) + ) return t, z - @staticmethod - @njit('(float32[:], float32[:], float64[:], int64)') - def sliding_kendall_tau(sample_1: np.ndarray, - sample_2: np.ndarray, - time_windows: np.ndarray, - fps: float) -> np.ndarray: - + @njit("(float32[:], float32[:], float64[:], int64)") + def sliding_kendall_tau( + sample_1: np.ndarray, sample_2: np.ndarray, time_windows: np.ndarray, fps: float + ) -> np.ndarray: results = np.full((sample_1.shape[0], time_windows.shape[0]), 0.0) for time_window_cnt in range(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) - for left, right in zip(range(0, sample_1.shape[0] + 1), range(window_size, sample_1.shape[0] + 1)): - sliced_sample_1, sliced_sample_2 = sample_1[left:right], sample_2[left:right] + for left, right in zip( + range(0, sample_1.shape[0] + 1), + range(window_size, sample_1.shape[0] + 1), + ): + sliced_sample_1, sliced_sample_2 = ( + sample_1[left:right], + sample_2[left:right], + ) rnks = np.argsort(sliced_sample_1) s1_rnk, s2_rnk = sliced_sample_1[rnks], sliced_sample_2[rnks] - cncrdnt_cnts, dscrdnt_cnts = np.full((s1_rnk.shape[0] - 1), np.nan), np.full((s1_rnk.shape[0] - 1), np.nan) + cncrdnt_cnts, dscrdnt_cnts = np.full( + (s1_rnk.shape[0] - 1), np.nan + ), np.full((s1_rnk.shape[0] - 1), np.nan) for i in range(s2_rnk.shape[0] - 1): - cncrdnt_cnts[i] = np.argwhere(s2_rnk[i + 1:] > s2_rnk[i]).flatten().shape[0] - dscrdnt_cnts[i] = np.argwhere(s2_rnk[i + 1:] < s2_rnk[i]).flatten().shape[0] - results[right][time_window_cnt] = (np.sum(cncrdnt_cnts) - np.sum(dscrdnt_cnts)) / (np.sum(cncrdnt_cnts) + np.sum(dscrdnt_cnts)) + cncrdnt_cnts[i] = ( + np.argwhere(s2_rnk[i + 1 :] > s2_rnk[i]).flatten().shape[0] + ) + dscrdnt_cnts[i] = ( + np.argwhere(s2_rnk[i + 1 :] < s2_rnk[i]).flatten().shape[0] + ) + results[right][time_window_cnt] = ( + np.sum(cncrdnt_cnts) - np.sum(dscrdnt_cnts) + ) / (np.sum(cncrdnt_cnts) + np.sum(dscrdnt_cnts)) return results @staticmethod - def local_outlier_factor(data: np.ndarray, - k: Union[int, float] = 5, - contamination: float = 1e-10) -> np.ndarray: - + def local_outlier_factor( + data: np.ndarray, k: Union[int, float] = 5, contamination: float = 1e-10 + ) -> np.ndarray: """ Compute the local outlier factor of each observation. @@ -1251,7 +1473,9 @@ def local_outlier_factor(data: np.ndarray, @staticmethod @jit(nopython=True) - def _hbos_compute(data: np.ndarray, histograms: typed.Dict, histogram_edges: typed.Dict) -> np.ndarray: + def _hbos_compute( + data: np.ndarray, histograms: typed.Dict, histogram_edges: typed.Dict + ) -> np.ndarray: """ Jitted helper to compute Histogram-based Outlier Score (HBOS) called by ``simba.mixins.statistics_mixin.Statistics.hbos``. @@ -1277,10 +1501,13 @@ def _hbos_compute(data: np.ndarray, histograms: typed.Dict, histogram_edges: typ results[i] = score return results - def hbos(self, - data: np.ndarray, - bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto'): - + def hbos( + self, + data: np.ndarray, + bucket_method: Literal[ + "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" + ] = "auto", + ): """ Jitted compute of Histogram-based Outlier Scores (HBOS). HBOS quantifies the abnormality of data points based on the densities of their feature values within their respective buckets over all feature values. @@ -1302,20 +1529,29 @@ def hbos(self, min_vals, max_vals = np.min(data, axis=0), np.max(data, axis=0) data = (data - min_vals) / (max_vals - min_vals) * (1 - 0) + 0 - histogram_edges = typed.Dict.empty(key_type=types.int64, value_type=types.float64[:]) + histogram_edges = typed.Dict.empty( + key_type=types.int64, value_type=types.float64[:] + ) histograms = typed.Dict.empty(key_type=types.int64, value_type=types.int64[:]) for i in range(data.shape[1]): bin_width, bin_count = bucket_data(data=data, method=bucket_method) - histograms[i] = self._hist_1d(data=data, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) - histogram_edges[i] = np.arange(0, 1 + bin_width, bin_width).astype(np.float64) - - results = self._hbos_compute(data=data, histograms=histograms, histogram_edges=histogram_edges) + histograms[i] = self._hist_1d( + data=data, + bin_count=bin_count, + range=np.array([0, int(bin_width * bin_count)]), + ) + histogram_edges[i] = np.arange(0, 1 + bin_width, bin_width).astype( + np.float64 + ) + + results = self._hbos_compute( + data=data, histograms=histograms, histogram_edges=histogram_edges + ) return results - def rolling_shapiro_wilks(self, - data: np.ndarray, - time_window: float, - fps: int) -> np.ndarray: + def rolling_shapiro_wilks( + self, data: np.ndarray, time_window: float, fps: int + ) -> np.ndarray: """ Compute Shapiro-Wilks normality statistics for sequentially binned values in a time-series. E.g., compute the normality statistics of ``Feature N`` in each window of ``time_window`` seconds. @@ -1333,13 +1569,13 @@ def rolling_shapiro_wilks(self, window_size, results = int(time_window * fps), np.full((data.shape[0]), -1.0) data = np.split(data, list(range(window_size, data.shape[0], window_size))) for cnt, i in enumerate(prange(1, len(data))): - start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) + start, end = int((cnt + 1) * window_size), int( + ((cnt + 1) * window_size) + window_size + ) results[start:end] = stats.shapiro(data[i])[0] return results - - # data_sizes = [5, 10, 100, 1000, 10000] # runs = 4 # @@ -1357,25 +1593,14 @@ def rolling_shapiro_wilks(self, # - # for i in data_sizes: # # data = np.hstack([data_1, data_2]) - - # - - - - - - - - # sample_1 = np.random.random_integers(low=1, high=2, size=(10, 50)).astype(np.float64) # sample_2 = np.random.random_integers(low=7, high=20, size=(2, 50)).astype(np.float64) # data = np.vstack([sample_1, sample_2]) @@ -1392,10 +1617,6 @@ def rolling_shapiro_wilks(self, # y = Statistics().polyfit(data=data, deg=1) - - - - # # @jit(nopython=True) # # def fft(data: np.ndarray): # # y = np.full(1, dtype=np.complex128, fill_value=np.nan) @@ -1421,11 +1642,6 @@ def rolling_shapiro_wilks(self, # print(time.time() - start) - - - - - # # # Statistics().rolling_one_way_anova(data=data, time_windows=np.array([0.5]), fps=10) @@ -1436,7 +1652,7 @@ def rolling_shapiro_wilks(self, # test = Statistics().rolling_independent_sample_t(data=data, time_window=1, fps=10) -#sm.tsa.stattools.adfuller(data, maxlag=None, regression='c', autolag='AIC', store=False, regresults=False) +# sm.tsa.stattools.adfuller(data, maxlag=None, regression='c', autolag='AIC', store=False, regresults=False) # sm.tsa.stattools.zivot_andrews(data, maxlag=None, regression='c', autolag=None) # # @@ -1445,11 +1661,11 @@ def rolling_shapiro_wilks(self, # start = time.time() # sample_1 = np.array([7, 2, 9, 4, 5, 6, 7, 8, 9]).astype(np.float32) -#data = np.random.randint(0, 4, (200)).astype(np.float32) +# data = np.random.randint(0, 4, (200)).astype(np.float32) # Statistics().pearsons_r(sample_1=sample_1, sample_2=sample_2) # print(time.time() - start) -#results = Statistics().rolling_mann_whitney(data=data, time_windows=np.array([.5]), fps=2.0) +# results = Statistics().rolling_mann_whitney(data=data, time_windows=np.array([.5]), fps=2.0) # import pickle @@ -1476,7 +1692,6 @@ def rolling_shapiro_wilks(self, # print(time.time() - start) - # sample_1 = np.array([5, 2, 3, 4, 5.2]) # sample_2 = np.array([2, 3, 4.1, 5, 6.1]) # @@ -1504,10 +1719,10 @@ def rolling_shapiro_wilks(self, # result = Statistics().levenes(sample_1=sample_1, sample_2=sample_2, critical_values=critical_values) # print(result) -#12.63909108903254 +# 12.63909108903254 -#stats.levene(sample_1, sample_2) +# stats.levene(sample_1, sample_2) # sample_1 = np.array([7, 2, 9, 4, 5, 6, 7, 8, 9]).astype(np.float64) # sample_2 = np.array([1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]).astype(np.float64) @@ -1538,7 +1753,7 @@ def rolling_shapiro_wilks(self, # data = np.random.randint(0, 50, (20)).astype(np.float64) -#critical_values = pickle.load(open( "/Users/simon/Desktop/envs/simba_dev/simba/assets/lookups/critical_values_05.pickle","rb")) #['independent_t_test']['one_tail'].values +# critical_values = pickle.load(open( "/Users/simon/Desktop/envs/simba_dev/simba/assets/lookups/critical_values_05.pickle","rb")) #['independent_t_test']['one_tail'].values # test = Statistics().sliding_independent_samples_t(data=data, time_window=1, fps=2, critical_values=critical_values, slide_time=1) # data = np.random.randint(0, 5, (200)).astype(np.float64) @@ -1555,5 +1770,3 @@ def rolling_shapiro_wilks(self, # for i in range(1000000): # test = Statistics().levenes(sample_1=sample_1, sample_2=sample_2) # print(time.time() - start) - -