From 3219cad1bc04deee164d04b24393fca40d4f94fd Mon Sep 17 00:00:00 2001 From: Simon Nilsson Date: Sat, 14 Oct 2023 16:49:53 -0400 Subject: [PATCH] Add files via upload --- simba/mixins/statistics_mixin.py | 1058 +++++++++++++----------------- 1 file changed, 445 insertions(+), 613 deletions(-) diff --git a/simba/mixins/statistics_mixin.py b/simba/mixins/statistics_mixin.py index e34636a75..b07601b2f 100644 --- a/simba/mixins/statistics_mixin.py +++ b/simba/mixins/statistics_mixin.py @@ -1,16 +1,13 @@ __author__ = "Simon Nilsson" -from typing import Optional, Tuple, Union - from sklearn.neighbors import LocalOutlierFactor - +from typing import Optional, Union, Tuple try: from typing import Literal except: from typing_extensions import Literal - import numpy as np -from numba import jit, njit, objmode, optional, prange, typed, types +from numba import njit, jit, prange, typed, optional, objmode, types from scipy import stats from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin @@ -22,14 +19,26 @@ class Statistics(FeatureExtractionMixin): """ Primarily frequentist statistics methods used for feature extraction or drift assessment. + .. note:: + Many method has numba typed `signatures `_ to decrease + compilation time. Make sure to pass the correct dtypes as indicated by signature decorators. + + + .. image:: _static/img/statistics_runtimes.png + :width: 1200 + :align: center + """ def __init__(self): FeatureExtractionMixin.__init__(self) + @staticmethod @jit(nopython=True) - def _hist_1d(data: np.ndarray, bin_count: int, range: np.ndarray): + def _hist_1d(data: np.ndarray, + bin_count: int, + range: np.ndarray): """ Jitted helper to compute 1D histograms with counts. @@ -45,10 +54,10 @@ def _hist_1d(data: np.ndarray, bin_count: int, range: np.ndarray): return hist @staticmethod - @njit("(float64[:], float64, float64)", cache=True) - def rolling_independent_sample_t( - data: np.ndarray, time_window: float, fps: float - ) -> np.ndarray: + @njit('(float64[:], float64, float64)', cache=True) + def rolling_independent_sample_t(data: np.ndarray, + time_window: float, + fps: float) -> np.ndarray: """ Jitted compute independent-sample t-statistics for sequentially binned values in a time-series. E.g., compute t-test statistics when comparing ``Feature N`` in the current 1s @@ -77,30 +86,18 @@ def rolling_independent_sample_t( window_size = int(time_window * fps) data = np.split(data, list(range(window_size, data.shape[0], window_size))) for cnt, i in enumerate(prange(1, len(data))): - start, end = int((cnt + 1) * window_size), int( - ((cnt + 1) * window_size) + window_size - ) - mean_1, mean_2 = np.mean(data[i - 1]), np.mean(data[i]) - stdev_1, stdev_2 = np.std(data[i - 1]), np.std(data[i]) - pooled_std = np.sqrt( - ( - (len(data[i - 1]) - 1) * stdev_1**2 - + (len(data[i]) - 1) * stdev_2**2 - ) - / (len(data[i - 1]) + len(data[i]) - 2) - ) - results[start:end] = (mean_1 - mean_2) / ( - pooled_std * np.sqrt(1 / len(data[i - 1]) + 1 / len(data[i])) - ) + start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) + mean_1, mean_2 = np.mean(data[i-1]), np.mean(data[i]) + stdev_1, stdev_2 = np.std(data[i-1]), np.std(data[i]) + pooled_std = np.sqrt(((len(data[i-1]) - 1) * stdev_1 ** 2 + (len(data[i]) - 1) * stdev_2 ** 2) / (len(data[i-1]) + len(data[i]) - 2)) + results[start:end] = (mean_1 - mean_2) / (pooled_std * np.sqrt(1 / len(data[i-1]) + 1 / len(data[i]))) return results @staticmethod @jit(nopython=True) - def independent_samples_t( - sample_1: np.ndarray, - sample_2: np.ndarray, - critical_values: Optional[np.ndarray] = None, - ) -> (float, Union[None, bool]): + def independent_samples_t(sample_1: np.ndarray, + sample_2: np.ndarray, + critical_values: Optional[np.ndarray] = None) -> (float, Union[None, bool]): """ Jitted compute independent-samples t-test statistic and boolean significance between two distributions. @@ -126,18 +123,11 @@ def independent_samples_t( m1, m2 = np.mean(sample_1), np.mean(sample_2) std_1 = np.sqrt(np.sum((sample_1 - m1) ** 2) / (len(sample_1) - 1)) std_2 = np.sqrt(np.sum((sample_2 - m2) ** 2) / (len(sample_2) - 1)) - pooled_std = np.sqrt( - ((len(sample_1) - 1) * std_1**2 + (len(sample_2) - 1) * std_2**2) - / (len(sample_1) + len(sample_2) - 2) - ) - t_statistic = (m1 - m2) / ( - pooled_std * np.sqrt(1 / len(sample_1) + 1 / len(sample_2)) - ) + pooled_std = np.sqrt(((len(sample_1) - 1) * std_1 ** 2 + (len(sample_2) - 1) * std_2 ** 2) / (len(sample_1) + len(sample_2) - 2)) + t_statistic = (m1 - m2) / (pooled_std * np.sqrt(1 / len(sample_1) + 1 / len(sample_2))) if critical_values is not None: dof = (sample_1.shape[0] + sample_2.shape[0]) - 2 - critical_value = np.interp( - dof, critical_values[:, 0], critical_values[:, 1] - ) + critical_value = np.interp(dof, critical_values[:, 0], critical_values[:, 1]) if critical_value < abs(t_statistic): significance_bool = True else: @@ -145,9 +135,11 @@ def independent_samples_t( return t_statistic, significance_bool + @staticmethod - @njit("(float64[:], float64[:])", cache=True) - def cohens_d(sample_1: np.ndarray, sample_2: np.ndarray) -> float: + @njit('(float64[:], float64[:])', cache=True) + def cohens_d(sample_1: np.ndarray, + sample_2: np.ndarray) -> float: """ Jitted compute of Cohen's d between two distributions @@ -161,15 +153,14 @@ def cohens_d(sample_1: np.ndarray, sample_2: np.ndarray) -> float: >>> Statistics().cohens_d(sample_1=sample_1, sample_2=sample_2) >>> -0.5952099775170546 """ - return (np.mean(sample_1) - np.mean(sample_2)) / ( - np.sqrt((np.std(sample_1) ** 2 + np.std(sample_2) ** 2) / 2) - ) + return (np.mean(sample_1) - np.mean(sample_2)) / (np.sqrt((np.std(sample_1) ** 2 + np.std(sample_2) ** 2) / 2)) @staticmethod - @njit("(float64[:], float64[:], float64)", cache=True) - def rolling_cohens_d( - data: np.ndarray, time_windows: np.ndarray, fps: float - ) -> np.ndarray: + @njit('(float64[:], float64[:], float64)', cache=True) + def rolling_cohens_d(data: np.ndarray, + time_windows: np.ndarray, + fps: float) -> np.ndarray: + """ Jitted compute of rolling Cohen's D statistic comparing the current time-window of size N to the preceding window of size N. @@ -189,26 +180,23 @@ def rolling_cohens_d( results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split( - data, list(range(window_size, data.shape[0], window_size)) - ) + data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ - j - ].astype(np.float32) - d = (np.mean(sample_1) - np.mean(sample_2)) / ( - np.sqrt((np.std(sample_1) ** 2 + np.std(sample_2) ** 2) / 2) - ) - results[window_start:window_end, i] = d + sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) + d = (np.mean(sample_1) - np.mean(sample_2)) / (np.sqrt((np.std(sample_1) ** 2 + np.std(sample_2) ** 2) / 2)) + results[window_start: window_end, i] = d return results - def rolling_two_sample_ks( - self, data: np.ndarray, time_window: float, fps: float - ) -> np.ndarray: + + @staticmethod + @njit('(float32[:], float64, float64)') + def rolling_two_sample_ks(data: np.ndarray, + time_window: float, + fps: float) -> np.ndarray: """ - Compute Kolmogorov two-sample statistics for sequentially binned values in a time-series. + Jitted compute Kolmogorov two-sample statistics for sequentially binned values in a time-series. E.g., compute KS statistics when comparing ``Feature N`` in the current 1s time-window, versus ``Feature N`` in the previous 1s time-window. :parameter ndarray data: 1D array of size len(frames) representing feature values. @@ -218,48 +206,35 @@ def rolling_two_sample_ks( :example: >>> data = np.random.randint(low=0, high=100, size=(200)).astype('float32') - >>> results = self.rolling_two_sample_ks(data=data, group_size_s=1, fps=30) + >>> results = Statistics().rolling_two_sample_ks(data=data, time_window=1, fps=30) """ window_size, results = int(time_window * fps), np.full((data.shape[0]), -1.0) data = np.split(data, list(range(window_size, data.shape[0], window_size))) for cnt, i in enumerate(prange(1, len(data))): - start, end = int((cnt + 1) * window_size), int( - ((cnt + 1) * window_size) + window_size - ) - sample_1, sample_2 = data[i - 1], data2 = data[i] + start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) + sample_1, sample_2 = data[i-1], data[i] combined_samples = np.sort(np.concatenate((sample_1, sample_2))) - ecdf_sample_1 = np.searchsorted( - sample_1, combined_samples, side="right" - ) / len(sample_1) - ecdf_sample_2 = np.searchsorted( - sample_2, combined_samples, side="right" - ) / len(sample_2) + ecdf_sample_1 = np.searchsorted(sample_1, combined_samples, side='right') / len(sample_1) + ecdf_sample_2 = np.searchsorted(sample_2, combined_samples, side='right') / len(sample_2) ks = np.max(np.abs(ecdf_sample_1 - ecdf_sample_2)) results[start:end] = ks return results @staticmethod @jit(nopython=True) - def two_sample_ks( - sample_1: np.ndarray, - sample_2: np.ndarray, - critical_values: Optional[bool] = None, - ) -> (float, Union[bool, None]): + def two_sample_ks(sample_1: np.ndarray, + sample_2: np.ndarray, + critical_values: Optional[bool] = None) -> (float, Union[bool, None]): + significance_bool = None combined_samples = np.sort(np.concatenate((sample_1, sample_2))) - ecdf_sample_1 = np.searchsorted(sample_1, combined_samples, side="right") / len( - sample_1 - ) - ecdf_sample_2 = np.searchsorted(sample_2, combined_samples, side="right") / len( - sample_2 - ) + ecdf_sample_1 = np.searchsorted(sample_1, combined_samples, side='right') / len(sample_1) + ecdf_sample_2 = np.searchsorted(sample_2, combined_samples, side='right') / len(sample_2) ks = np.max(np.abs(ecdf_sample_1 - ecdf_sample_2)) if critical_values is not None: combined_sample_size = len(sample_1) + len(sample_2) - critical_value = np.interp( - combined_sample_size, critical_values[:, 0], critical_values[:, 1] - ) + critical_value = np.interp(combined_sample_size, critical_values[:, 0], critical_values[:, 1]) if critical_value < abs(ks): significance_bool = True else: @@ -269,11 +244,9 @@ def two_sample_ks( @staticmethod @jit(nopython=True) - def one_way_anova( - sample_1: np.ndarray, - sample_2: np.ndarray, - critical_values: Optional[np.ndarray] = None, - ) -> (float, float): + def one_way_anova(sample_1: np.ndarray, + sample_2: np.ndarray, + critical_values: Optional[np.ndarray] = None) -> (float, float): """ Jitted compute of one-way ANOVA F statistics and associated p-value for two distributions. @@ -290,19 +263,15 @@ def one_way_anova( significance_bool = True n1, n2 = len(sample_1), len(sample_2) m1, m2 = np.mean(sample_1), np.mean(sample_2) - ss_between = ( - n1 * (m1 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 - + n2 * (m2 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 - ) + ss_between = n1 * (m1 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + n2 * ( + m2 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 ss_within = np.sum((sample_1 - m1) ** 2) + np.sum((sample_2 - m2) ** 2) df_between, df_within = 1, n1 + n2 - 2 ms_between, ms_within = ss_between / df_between, ss_within / df_within f = ms_between / ms_within if critical_values is not None: critical_values = critical_values[:, np.array([0, df_between])] - critical_value = np.interp( - df_within, critical_values[:, 0], critical_values[:, 1] - ) + critical_value = np.interp(df_within, critical_values[:, 0], critical_values[:, 1]) if f > critical_value: significance_bool = True else: @@ -311,10 +280,11 @@ def one_way_anova( return (f, significance_bool) @staticmethod - @njit("(float32[:], float64[:], float64)", cache=True) - def rolling_one_way_anova( - data: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + @njit('(float32[:], float64[:], float64)', cache=True) + def rolling_one_way_anova(data: np.ndarray, + time_windows: np.ndarray, + fps: int) -> np.ndarray: + """ Jitted compute of rolling one-way ANOVA F-statistic comparing the current time-window of size N to the preceding window of size N. @@ -336,38 +306,28 @@ def rolling_one_way_anova( results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split( - data, list(range(window_size, data.shape[0], window_size)) - ) + data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ - j - ].astype(np.float32) + sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) n1, n2 = len(sample_1), len(sample_2) m1, m2 = np.mean(sample_1), np.mean(sample_2) - ss_between = ( - n1 * (m1 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 - + n2 * (m2 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 - ) + ss_between = n1 * (m1 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 + n2 * (m2 - np.mean(np.concatenate((sample_1, sample_2)))) ** 2 ss_within = np.sum((sample_1 - m1) ** 2) + np.sum((sample_2 - m2) ** 2) df_between, df_within = 1, n1 + n2 - 2 ms_between, ms_within = ss_between / df_between, ss_within / df_within f = ms_between / ms_within - results[window_start:window_end, i] = f + results[window_start: window_end, i] = f return results - def kullback_leibler_divergence( - self, - sample_1: np.ndarray, - sample_2: np.ndarray, - fill_value: int = 1, - bucket_method: Literal[ - "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" - ] = "auto", - ) -> float: + def kullback_leibler_divergence(self, + sample_1: np.ndarray, + sample_2: np.ndarray, + fill_value: int = 1, + bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> float: + """ Compute Kullback-Leibler divergence between two distributions. @@ -381,33 +341,19 @@ def kullback_leibler_divergence( """ bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d( - data=sample_1, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_2_hist = self._hist_1d( - data=sample_2, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_1_hist[sample_1_hist == 0] = fill_value - sample_2_hist[sample_2_hist == 0] = fill_value - sample_1_hist, sample_2_hist = sample_1_hist / np.sum( - sample_1_hist - ), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_1_hist[sample_1_hist == 0] = fill_value; sample_2_hist[sample_2_hist == 0] = fill_value + sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) return stats.entropy(pk=sample_1_hist, qk=sample_2_hist) - def rolling_kullback_leibler_divergence( - self, - data: np.ndarray, - time_windows: np.ndarray, - fps: int, - fill_value: int = 1, - bucket_method: Literal[ - "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" - ] = "auto", - ) -> np.ndarray: + def rolling_kullback_leibler_divergence(self, + data: np.ndarray, + time_windows: np.ndarray, + fps: int, + fill_value: int = 1, + bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> np.ndarray: + """ Compute rolling Kullback-Leibler divergence comparing the current time-window of size N to the preceding window of size N. @@ -430,43 +376,25 @@ def rolling_kullback_leibler_divergence( results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split( - data, list(range(window_size, data.shape[0], window_size)) - ) + data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ - j - ].astype(np.float32) + sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d( - data=sample_1, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_2_hist = self._hist_1d( - data=sample_2, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_1_hist[sample_1_hist == 0] = fill_value - sample_2_hist[sample_2_hist == 0] = fill_value - sample_1_hist, sample_2_hist = sample_1_hist / np.sum( - sample_1_hist - ), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_1_hist[sample_1_hist == 0] = fill_value; sample_2_hist[sample_2_hist == 0] = fill_value + sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) kl = stats.entropy(pk=sample_1_hist, qk=sample_2_hist) - results[window_start:window_end, i] = kl + results[window_start: window_end, i] = kl return results - def jensen_shannon_divergence( - self, - sample_1: np.ndarray, - sample_2: np.ndarray, - bucket_method: Literal[ - "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" - ] = "auto", - ) -> float: + def jensen_shannon_divergence(self, + sample_1: np.ndarray, + sample_2: np.ndarray, + bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> float: + """ Compute Jensen-Shannon divergence between two distributions. Useful for (i) measure drift in datasets, and (ii) featurization of distribution shifts across @@ -484,31 +412,17 @@ def jensen_shannon_divergence( """ bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d( - data=sample_1, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_2_hist = self._hist_1d( - data=sample_2, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) + sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) mean_hist = np.mean([sample_1_hist, sample_2_hist], axis=0) - kl_sample_1, kl_sample_2 = stats.entropy( - pk=sample_1_hist, qk=mean_hist - ), stats.entropy(pk=sample_2_hist, qk=mean_hist) + kl_sample_1, kl_sample_2 = stats.entropy(pk=sample_1_hist, qk=mean_hist), stats.entropy(pk=sample_2_hist, qk=mean_hist) return (kl_sample_1 + kl_sample_2) / 2 - def rolling_jensen_shannon_divergence( - self, - data: np.ndarray, - time_windows: np.ndarray, - fps=int, - bucket_method: Literal[ - "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" - ] = "auto", - ) -> np.ndarray: + def rolling_jensen_shannon_divergence(self, + data: np.ndarray, + time_windows: np.ndarray, + fps = int, + bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> np.ndarray: """ Compute rolling Jensen-Shannon divergence comparing the current time-window of size N to the preceding window of size N. @@ -522,52 +436,39 @@ def rolling_jensen_shannon_divergence( results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split( - data, list(range(window_size, data.shape[0], window_size)) - ) + data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ - j - ].astype(np.float32) + sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d( - data=sample_1, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_2_hist = self._hist_1d( - data=sample_2, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_1_hist, sample_2_hist = sample_1_hist / np.sum( - sample_1_hist - ), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) mean_hist = np.mean([sample_1_hist, sample_2_hist], axis=0) - kl_sample_1, kl_sample_2 = stats.entropy( - pk=sample_1_hist, qk=mean_hist - ), stats.entropy(pk=sample_2_hist, qk=mean_hist) + kl_sample_1, kl_sample_2 = stats.entropy(pk=sample_1_hist, qk=mean_hist), stats.entropy(pk=sample_2_hist, qk=mean_hist) js = (kl_sample_1 + kl_sample_2) / 2 - results[window_start:window_end, i] = js + results[window_start: window_end, i] = js return results - def wasserstein_distance( - self, - sample_1: np.ndarray, - sample_2: np.ndarray, - bucket_method: Literal[ - "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" - ] = "auto", - ) -> float: + + def wasserstein_distance(self, + sample_1: np.ndarray, + sample_2: np.ndarray, + bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> float: + """ Compute Wasserstein distance between two distributions. + .. note:: + Uses ``stats.wasserstein_distance``. I have tried to move ``stats.wasserstein_distance`` to jitted method extensively, + but this doesn't give significant runtime improvement. Rate-limiter appears to be the _hist_1d. + :parameter ndarray sample_1: First 1d array representing feature values. :parameter ndarray sample_2: Second 1d array representing feature values. :parameter Literal bucket_method: Estimator determining optimal bucket count and bucket width. Default: The maximum of the Sturges and Freedman-Diaconis estimators :returns float: Wasserstein distance between ``sample_1`` and ``sample_2`` + :example: >>> sample_1 = np.random.normal(loc=10, scale=2, size=10) @@ -577,32 +478,18 @@ def wasserstein_distance( """ bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d( - data=sample_1, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_2_hist = self._hist_1d( - data=sample_2, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_1_hist, sample_2_hist = sample_1_hist / np.sum( - sample_1_hist - ), sample_2_hist / np.sum(sample_2_hist) - return stats.wasserstein_distance( - u_values=sample_1_hist, v_values=sample_2_hist - ) - - def rolling_wasserstein_distance( - self, - data: np.ndarray, - time_windows: np.ndarray, - fps: int, - bucket_method: Literal[ - "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" - ] = "auto", - ) -> np.ndarray: + sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) + return stats.wasserstein_distance(u_values=sample_1_hist, v_values=sample_2_hist) + + + def rolling_wasserstein_distance(self, + data: np.ndarray, + time_windows: np.ndarray, + fps: int, + bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> np.ndarray: + """ Compute rolling Wasserstein distance comparing the current time-window of size N to the preceding window of size N. @@ -617,45 +504,25 @@ def rolling_wasserstein_distance( results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split( - data, list(range(window_size, data.shape[0], window_size)) - ) + data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ - j - ].astype(np.float32) + sample_1, sample_2 = data_split[j-1].astype(np.float32), data_split[j].astype(np.float32) bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d( - data=sample_1, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_2_hist = self._hist_1d( - data=sample_2, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_1_hist, sample_2_hist = sample_1_hist / np.sum( - sample_1_hist - ), sample_2_hist / np.sum(sample_2_hist) - w = stats.wasserstein_distance( - u_values=sample_1_hist, v_values=sample_2_hist - ) - results[window_start:window_end, i] = w + sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) + w = stats.wasserstein_distance(u_values=sample_1_hist, v_values=sample_2_hist) + results[window_start: window_end, i] = w return results - def population_stability_index( - self, - sample_1: np.ndarray, - sample_2: np.ndarray, - fill_value: int = 1, - bucket_method: Literal[ - "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" - ] = "auto", - ) -> float: + def population_stability_index(self, + sample_1: np.ndarray, + sample_2: np.ndarray, + fill_value: int = 1, + bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> float: """ Compute Population Stability Index (PSI) comparing two distributions. @@ -671,35 +538,20 @@ def population_stability_index( """ bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d( - data=sample_1, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_2_hist = self._hist_1d( - data=sample_2, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_1_hist[sample_1_hist == 0] = fill_value - sample_2_hist[sample_2_hist == 0] = fill_value - sample_1_hist, sample_2_hist = sample_1_hist / np.sum( - sample_1_hist - ), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_1_hist[sample_1_hist == 0] = fill_value; sample_2_hist[sample_2_hist == 0] = fill_value + sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) samples_diff = sample_2_hist - sample_1_hist log = np.log(sample_2_hist / sample_1_hist) return np.sum(samples_diff * log) - def rolling_population_stability_index( - self, - data: np.ndarray, - time_windows: np.ndarray, - fps: int, - fill_value: int = 1, - bucket_method: Literal[ - "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" - ] = "auto", - ) -> np.ndarray: + def rolling_population_stability_index(self, + data: np.ndarray, + time_windows: np.ndarray, + fps: int, + fill_value: int = 1, + bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto') -> np.ndarray: """ Compute rolling Population Stability Index (PSI) comparing the current time-window of size N to the preceding window of size N. @@ -717,68 +569,29 @@ def rolling_population_stability_index( results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split( - data, list(range(window_size, data.shape[0], window_size)) - ) + data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ - j - ].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[j].astype(np.float32) bin_width, bin_count = bucket_data(data=sample_1, method=bucket_method) - sample_1_hist = self._hist_1d( - data=sample_1, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_2_hist = self._hist_1d( - data=sample_2, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - sample_1_hist[sample_1_hist == 0] = fill_value - sample_2_hist[sample_2_hist == 0] = fill_value - sample_1_hist, sample_2_hist = sample_1_hist / np.sum( - sample_1_hist - ), sample_2_hist / np.sum(sample_2_hist) + sample_1_hist = self._hist_1d(data=sample_1, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_2_hist = self._hist_1d(data=sample_2, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + sample_1_hist[sample_1_hist == 0] = fill_value; sample_2_hist[sample_2_hist == 0] = fill_value + sample_1_hist, sample_2_hist = sample_1_hist / np.sum(sample_1_hist), sample_2_hist / np.sum(sample_2_hist) samples_diff = sample_2_hist - sample_1_hist log = np.log(sample_2_hist / sample_1_hist) psi = np.sum(samples_diff * log) - results[window_start:window_end, i] = psi + results[window_start: window_end, i] = psi return results - def shapiro_wilks( - self, data: np.ndarray, time_window: float, fps: int - ) -> np.ndarray: - """ - Compute Shapiro-Wilks normality statistics for sequentially binned values in a time-series. E.g., compute - the normality statistics of ``Feature N`` in each window of ``time_window`` seconds. - - :parameter ndarray data: 1D array of size len(frames) representing feature values. - :parameter int time_window: The size of the buckets in seconds. - :parameter int fps: Frame-rate of recorded video. - :return np.ndarray: Array of size data.shape[0] with Shapiro-Wilks normality statistics - - :example: - >>> data = np.random.randint(low=0, high=100, size=(200)).astype('float32') - >>> results = self.shapiro_wilks(data=data, time_window=1, fps=30) - """ - - window_size, results = int(time_window * fps), np.full((data.shape[0]), -1.0) - data = np.split(data, list(range(window_size, data.shape[0], window_size))) - for cnt, i in enumerate(prange(1, len(data))): - start, end = int((cnt + 1) * window_size), int( - ((cnt + 1) * window_size) + window_size - ) - results[start:end] = stats.shapiro(data[i])[0] - return results @staticmethod - @njit("(float64[:], float64[:])", cache=True) - # @jit(nopython=True) - def kruskal_wallis(sample_1: np.ndarray, sample_2: np.ndarray) -> float: + @njit('(float64[:], float64[:])', cache=True) + def kruskal_wallis(sample_1: np.ndarray, + sample_2: np.ndarray) -> float: + """ Jitted compute of Kruskal-Wallis H between two distributions. @@ -792,23 +605,23 @@ def kruskal_wallis(sample_1: np.ndarray, sample_2: np.ndarray) -> float: >>> results = Statistics().kruskal_wallis(sample_1=sample_1, sample_2=sample_2) """ - # sample_1 = np.concatenate((np.zeros((sample_1.shape[0], 1)), sample_1.reshape(-1, 1)), axis=1) - # sample_2 = np.concatenate((np.ones((sample_2.shape[0], 1)), sample_2.reshape(-1, 1)), axis=1) + #sample_1 = np.concatenate((np.zeros((sample_1.shape[0], 1)), sample_1.reshape(-1, 1)), axis=1) + #sample_2 = np.concatenate((np.ones((sample_2.shape[0], 1)), sample_2.reshape(-1, 1)), axis=1) data = np.vstack((sample_1, sample_2)) ranks = fast_mean_rank(data=data[:, 1], descending=False) data = np.hstack((data, ranks.reshape(-1, 1))) - sample_1_summed_rank = np.sum(data[0 : sample_1.shape[0], 2].flatten()) - sample_2_summed_rank = np.sum(data[sample_1.shape[0] :, 2].flatten()) + sample_1_summed_rank = np.sum(data[0: sample_1.shape[0], 2].flatten()) + sample_2_summed_rank = np.sum(data[sample_1.shape[0]:, 2].flatten()) h1 = 12 / (data.shape[0] * (data.shape[0] + 1)) - h2 = (np.square(sample_1_summed_rank) / sample_1.shape[0]) + ( - np.square(sample_2_summed_rank) / sample_2.shape[0] - ) + h2 = (np.square(sample_1_summed_rank) / sample_1.shape[0]) + (np.square(sample_2_summed_rank) / sample_2.shape[0]) h3 = 3 * (data.shape[0] + 1) - return h1 * h2 - h3 + return h1 * h2 - h3 @staticmethod - @njit("(float64[:], float64[:])", cache=True) - def mann_whitney(sample_1: np.ndarray, sample_2: np.ndarray) -> float: + @njit('(float64[:], float64[:])', cache=True) + def mann_whitney(sample_1: np.ndarray, + sample_2: np.ndarray) -> float: + """ Jitted compute of Mann-Whitney U between two distributions. @@ -831,11 +644,13 @@ def mann_whitney(sample_1: np.ndarray, sample_2: np.ndarray) -> float: u2 = n1 * n2 - u1 return min(u1, u2) + @staticmethod @jit(nopython=True, cache=True) - def levenes( - sample_1: np.ndarray, sample_2: np.ndarray, critical_values: np.ndarray - ) -> (float, Union[bool, None]): + def levenes(sample_1: np.ndarray, + sample_2: np.ndarray, + critical_values: np.ndarray) -> (float, Union[bool, None]): + """ Jitted compute of two-sample Leven's W. @@ -860,14 +675,10 @@ def levenes( Ni_x, Ni_y = len(sample_1), len(sample_2) Yci_x, Yci_y = np.median(sample_1), np.median(sample_2) Ntot = Ni_x + Ni_y - Zij_x, Zij_y = np.abs(sample_1 - Yci_x).astype(np.float32), np.abs( - sample_2 - Yci_y - ).astype(np.float32) + Zij_x, Zij_y = np.abs(sample_1 - Yci_x).astype(np.float32), np.abs(sample_2 - Yci_y).astype(np.float32) Zbari_x, Zbari_y = np.mean(Zij_x), np.mean(Zij_y) Zbar = ((Zbari_x * Ni_x) + (Zbari_y * Ni_y)) / Ntot - numer = (Ntot - 2) * np.sum( - np.array([Ni_x, Ni_y]) * (np.array([Zbari_x, Zbari_y]) - Zbar) ** 2 - ) + numer = (Ntot - 2) * np.sum(np.array([Ni_x, Ni_y]) * (np.array([Zbari_x, Zbari_y]) - Zbar) ** 2) dvar = np.sum((Zij_x - Zbari_x) ** 2) + np.sum((Zij_y - Zbari_y) ** 2) denom = (2 - 1.0) * dvar l_statistic = numer / denom @@ -876,9 +687,7 @@ def levenes( dfn, dfd = 1, (Ni_x + Ni_y) - 2 idx = (np.abs(critical_values[0][1:] - dfd)).argmin() + 1 critical_values = critical_values[1:, np.array([0, idx])] - critical_value = np.interp( - dfd, critical_values[:, 0], critical_values[:, 1] - ) + critical_value = np.interp(dfd, critical_values[:, 0], critical_values[:, 1]) if l_statistic >= critical_value: significance_bool = True else: @@ -887,8 +696,11 @@ def levenes( return (l_statistic, significance_bool) @staticmethod - @njit("(float64[:], float64[:], float64)", cache=True) - def rolling_levenes(data: np.ndarray, time_windows: np.ndarray, fps: float): + @njit('(float64[:], float64[:], float64)', cache=True) + def rolling_levenes(data: np.ndarray, + time_windows: np.ndarray, + fps: float): + """ Jitted compute of rolling Levene's W comparing the current time-window of size N to the preceding window of size N. @@ -907,35 +719,29 @@ def rolling_levenes(data: np.ndarray, time_windows: np.ndarray, fps: float): results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split( - data, list(range(window_size, data.shape[0], window_size)) - ) + data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ - j - ].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[j].astype(np.float32) Ni_x, Ni_y = len(sample_1), len(sample_2) Yci_x, Yci_y = np.median(sample_1), np.median(sample_2) Ntot = Ni_x + Ni_y - Zij_x, Zij_y = np.abs(sample_1 - Yci_x).astype(np.float32), np.abs( - sample_2 - Yci_y - ).astype(np.float32) + Zij_x, Zij_y = np.abs(sample_1 - Yci_x).astype(np.float32), np.abs(sample_2 - Yci_y).astype(np.float32) Zbari_x, Zbari_y = np.mean(Zij_x), np.mean(Zij_y) Zbar = ((Zbari_x * Ni_x) + (Zbari_y * Ni_y)) / Ntot - numer = (Ntot - 2) * np.sum( - np.array([Ni_x, Ni_y]) * (np.array([Zbari_x, Zbari_y]) - Zbar) ** 2 - ) + numer = (Ntot - 2) * np.sum(np.array([Ni_x, Ni_y]) * (np.array([Zbari_x, Zbari_y]) - Zbar) ** 2) dvar = np.sum((Zij_x - Zbari_x) ** 2) + np.sum((Zij_y - Zbari_y) ** 2) denom = (2 - 1.0) * dvar w = numer / denom - results[window_start:window_end, i] = w + results[window_start: window_end, i] = w return results + @staticmethod @jit(nopython=True, cache=True) - def brunner_munzel(sample_1: np.ndarray, sample_2: np.ndarray) -> float: + def brunner_munzel(sample_1: np.ndarray, + sample_2: np.ndarray) -> float: """ Jitted compute of Brunner-Munzel W between two distributions. @@ -953,7 +759,7 @@ def brunner_munzel(sample_1: np.ndarray, sample_2: np.ndarray) -> float: """ nx, ny = len(sample_1), len(sample_2) rankc = fast_mean_rank(np.concatenate((sample_1, sample_2))) - rankcx, rankcy = rankc[0:nx], rankc[nx : nx + ny] + rankcx, rankcy = rankc[0:nx], rankc[nx:nx + ny] rankcx_mean, rankcy_mean = np.mean(rankcx), np.mean(rankcy) rankx, ranky = fast_mean_rank(sample_1), fast_mean_rank(sample_2) rankx_mean, ranky_mean = np.mean(rankx), np.mean(ranky) @@ -963,42 +769,38 @@ def brunner_munzel(sample_1: np.ndarray, sample_2: np.ndarray) -> float: wbfn /= (nx + ny) * np.sqrt(nx * Sx + ny * Sy) return -wbfn + @staticmethod - @jit(nopython=True) - def rolling_barletts_test(data: np.ndarray, time_windows: np.ndarray, fps: float): + @njit('(float32[:], float64[:], float64)', cache=True) + def rolling_barletts_test(data: np.ndarray, + time_windows: np.ndarray, + fps: float): + results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split( - data, list(range(window_size, data.shape[0], window_size)) - ) + data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ - j - ].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[j].astype(np.float32) n_1 = len(sample_1) n_2 = len(sample_2) N = n_1 + n_2 - mean_variance_1 = np.sum((sample_1 - np.mean(sample_1)) ** 2) / ( - n_1 - 1 - ) - mean_variance_2 = np.sum((sample_2 - np.mean(sample_2)) ** 2) / ( - n_2 - 1 - ) - numerator = (N - 2) * ( - np.log(mean_variance_1) + np.log(mean_variance_2) - ) + mean_variance_1 = np.sum((sample_1 - np.mean(sample_1)) ** 2) / (n_1 - 1) + mean_variance_2 = np.sum((sample_2 - np.mean(sample_2)) ** 2) / (n_2 - 1) + numerator = (N - 2) * (np.log(mean_variance_1) + np.log(mean_variance_2)) denominator = 1 / (n_1 - 1) + 1 / (n_2 - 1) u = numerator / denominator - results[window_start:window_end, i] = u + results[window_start: window_end, i] = u return results @staticmethod - @njit("(float32[:], float32[:])") - def pearsons_r(sample_1: np.ndarray, sample_2: np.ndarray): + @njit('(float32[:], float32[:])') + def pearsons_r(sample_1: np.ndarray, + sample_2: np.ndarray): + """ :example: >>> sample_1 = np.array([7, 2, 9, 4, 5, 6, 7, 8, 9]).astype(np.float32) @@ -1008,15 +810,15 @@ def pearsons_r(sample_1: np.ndarray, sample_2: np.ndarray): m1, m2 = np.mean(sample_1), np.mean(sample_2) numerator = np.sum((sample_1 - m1) * (sample_2 - m2)) - denominator = np.sqrt( - np.sum((sample_1 - m1) ** 2) * np.sum((sample_2 - m2) ** 2) - ) + denominator = np.sqrt(np.sum((sample_1 - m1) ** 2) * np.sum((sample_2 - m2) ** 2)) r = numerator / denominator return r @staticmethod - @njit("(float32[:], float32[:])") - def spearman_rank_correlation(sample_1: np.ndarray, sample_2: np.ndarray): + @njit('(float32[:], float32[:])') + def spearman_rank_correlation(sample_1: np.ndarray, + sample_2: np.ndarray): + """ :example: >>> sample_1 = np.array([7, 2, 9, 4, 5, 6, 7, 8, 9]).astype(np.float32) @@ -1025,17 +827,17 @@ def spearman_rank_correlation(sample_1: np.ndarray, sample_2: np.ndarray): >>> 0.0003979206085205078 """ - rank_x, rank_y = np.argsort(np.argsort(sample_1)), np.argsort( - np.argsort(sample_2) - ) + rank_x, rank_y = np.argsort(np.argsort(sample_1)), np.argsort(np.argsort(sample_2)) d_squared = np.sum((rank_x - rank_y) ** 2) return 1 - (6 * d_squared) / (len(sample_1) * (len(sample_2) ** 2 - 1)) + @staticmethod - @njit("(float32[:], float32[:], float64[:], int64)") - def sliding_pearsons_r( - sample_1: np.ndarray, sample_2: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + @njit('(float32[:], float32[:], float64[:], int64)') + def sliding_pearsons_r(sample_1: np.ndarray, + sample_2: np.ndarray, + time_windows: np.ndarray, + fps: int) -> np.ndarray: """ Given two 1D arrays of size N, create sliding window of size time_windows[i] * fps and return Pearson's R between the values in the two 1D arrays in each window. Address "what is the correlation between Feature 1 and @@ -1058,30 +860,30 @@ def sliding_pearsons_r( >>> [[-1.][-1.][-1.][-1.][0.227][-0.319][-0.196][0.474][-0.061][0.713]] """ + results = np.full((sample_1.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - for left, right in zip( - prange(0, sample_1.shape[0] + 1), - prange(window_size, sample_1.shape[0] + 1), - ): + for left, right in zip(prange(0, sample_1.shape[0]+1), prange(window_size, sample_1.shape[0]+1)): s1, s2 = sample_1[left:right], sample_2[left:right] m1, m2 = np.mean(s1), np.mean(s2) numerator = np.sum((s1 - m1) * (s2 - m2)) denominator = np.sqrt(np.sum((s1 - m1) ** 2) * np.sum((s2 - m2) ** 2)) - r = numerator / denominator - results[right - 1, i] = r + if denominator != 0: + r = numerator / denominator + results[right-1, i] = r + else: + results[right - 1, i] = -1.0 return results @staticmethod @jit(nopython=True) - def chi_square( - sample_1: np.ndarray, - sample_2: np.ndarray, - critical_values: Optional[np.ndarray] = None, - type: Optional[str] = "goodness_of_fit", - ) -> Tuple[float, Union[bool, None]]: + def chi_square(sample_1: np.ndarray, + sample_2: np.ndarray, + critical_values: Optional[np.ndarray] = None, + type: Optional[str] = 'goodness_of_fit') -> Tuple[float, Union[bool, None]]: + """ Jitted compute of chi square between two categorical distributions. @@ -1107,9 +909,8 @@ def chi_square( sample_2_counts = np.zeros(len(unique_categories), dtype=np.int64) for i in prange(len(unique_categories)): - sample_1_counts[i], sample_2_counts[i] = np.sum( - sample_1 == unique_categories[i] - ), np.sum(sample_2 == unique_categories[i]) + sample_1_counts[i], sample_2_counts[i] = np.sum(sample_1 == unique_categories[i]), np.sum( + sample_2 == unique_categories[i]) for i in prange(len(unique_categories)): count_1, count_2 = sample_1_counts[i], sample_2_counts[i] @@ -1119,7 +920,7 @@ def chi_square( chi_square += ((count_1 - count_2) ** 2) / (count_2 + 1) if critical_values is not None: - if type == "goodness_of_fit": + if type == 'goodness_of_fit': df = unique_categories.shape[0] - 1 else: df = (len(sample_1_counts) - 1) * (len(sample_2_counts) - 1) @@ -1132,15 +933,14 @@ def chi_square( return chi_square, significance_bool + @staticmethod - @njit("(float32[:], float32, float32, float32[:,:], float32)") - def sliding_independent_samples_t( - data: np.ndarray, - time_window: float, - slide_time: float, - critical_values: np.ndarray, - fps: float, - ) -> np.ndarray: + @njit('(float32[:], float32, float32, float32[:,:], float32)') + def sliding_independent_samples_t(data: np.ndarray, + time_window: float, + slide_time: float, + critical_values: np.ndarray, + fps: float) -> np.ndarray: """ Jitted compute of sliding independent sample t-test. Compares the feature values in current time-window to prior time-windows to find the length in time to the most recent time-window where a significantly different @@ -1167,38 +967,33 @@ def sliding_independent_samples_t( results = np.full((data.shape[0]), 0.0) window_size, slide_size = int(time_window * fps), int(slide_time * fps) for i in range(1, data.shape[0]): - sample_1_left, sample_1_right = i, i + window_size - sample_2_left, sample_2_right = ( - sample_1_left - slide_size, - sample_1_right - slide_size, - ) + sample_1_left, sample_1_right = i, i+window_size + sample_2_left, sample_2_right = sample_1_left-slide_size, sample_1_right-slide_size sample_1 = data[sample_1_left:sample_1_right] dof, steps_taken = (sample_1.shape[0] + sample_1.shape[0]) - 2, 1 while sample_2_left >= 0: sample_2 = data[sample_2_left:sample_2_right] - t_statistic = (np.mean(sample_1) - np.mean(sample_2)) / np.sqrt( - (np.std(sample_1) / sample_1.shape[0]) - + (np.std(sample_2) / sample_1.shape[0]) - ) - critical_val = critical_values[dof - 1][1] + t_statistic = (np.mean(sample_1) - np.mean(sample_2)) / np.sqrt((np.std(sample_1) / sample_1.shape[0]) + (np.std(sample_2) / sample_1.shape[0])) + critical_val = critical_values[dof-1][1] if t_statistic >= critical_val: break else: - sample_2_left -= 1 - sample_2_right -= 1 - steps_taken += 1 + sample_2_left -= 1; sample_2_right -= 1; steps_taken += 1 if sample_2_left < 0: steps_taken = -1 if steps_taken == -1: results[i + window_size] = -1 else: - results[i + window_size] = steps_taken * slide_time + results[i+window_size] = steps_taken * slide_time return results @staticmethod - @njit("(float32[:], float64[:], float32)") - def rolling_mann_whitney(data: np.ndarray, time_windows: np.ndarray, fps: float): + @njit('(float32[:], float64[:], float32)') + def rolling_mann_whitney(data: np.ndarray, + time_windows: np.ndarray, + fps: float): + """ Jitted compute of rolling Mann-Whitney U comparing the current time-window of size N to the preceding window of size N. @@ -1220,35 +1015,36 @@ def rolling_mann_whitney(data: np.ndarray, time_windows: np.ndarray, fps: float) results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - data_split = np.split( - data, list(range(window_size, data.shape[0], window_size)) - ) + data_split = np.split(data, list(range(window_size, data.shape[0], window_size))) for j in prange(1, len(data_split)): window_start = int(window_size * j) window_end = int(window_start + window_size) - sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[ - j - ].astype(np.float32) + sample_1, sample_2 = data_split[j - 1].astype(np.float32), data_split[j].astype(np.float32) n1, n2 = sample_1.shape[0], sample_2.shape[0] ranked = fast_mean_rank(np.concatenate((sample_1, sample_2))) u1 = n1 * n2 + (n1 * (n1 + 1)) / 2.0 - np.sum(ranked[:n1], axis=0) u2 = n1 * n2 - u1 u = min(u1, u2) - results[window_start:window_end, i] = u + results[window_start: window_end, i] = u return results + def chow_test(self): pass def adf_test(self): pass + @staticmethod - @njit("(float32[:], float32[:], float64[:], int64)") - def sliding_spearman_rank_correlation( - sample_1: np.ndarray, sample_2: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + @njit('(float32[:], float32[:], float64[:], int64)') + def sliding_spearman_rank_correlation(sample_1: np.ndarray, + sample_2: np.ndarray, + time_windows: np.ndarray, + fps: int) -> np.ndarray: + + """ Given two 1D arrays of size N, create sliding window of size time_windows[i] * fps and return Spearman's rank correlation between the values in the two 1D arrays in each window. Address "what is the correlation between Feature 1 and @@ -1274,23 +1070,22 @@ def sliding_spearman_rank_correlation( results = np.full((sample_1.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): window_size = int(time_windows[i] * fps) - for left, right in zip( - prange(0, sample_1.shape[0] + 1), - prange(window_size, sample_1.shape[0] + 1), - ): + for left, right in zip(prange(0, sample_1.shape[0] + 1), prange(window_size, sample_1.shape[0] + 1)): s1, s2 = sample_1[left:right], sample_2[left:right] rank_x, rank_y = np.argsort(np.argsort(s1)), np.argsort(np.argsort(s2)) d_squared = np.sum((rank_x - rank_y) ** 2) s = 1 - (6 * d_squared) / (len(s1) * (len(s2) ** 2 - 1)) - results[right - 1, i] = s + results[right-1, i] = s return results + @staticmethod - @njit("(float32[:], float64, float64, float64)") - def sliding_autocorrelation( - data: np.ndarray, max_lag: float, time_window: float, fps: float - ): + @njit('(float32[:], float64, float64, float64)') + def sliding_autocorrelation(data: np.ndarray, + max_lag: float, + time_window: float, + fps: float): """ Jitted compute of sliding auto-correlations (the correlation of a feature with itself using lagged windows). @@ -1302,9 +1097,9 @@ def sliding_autocorrelation( max_frm_lag, time_window_frms = int(max_lag * fps), int(time_window * fps) results = np.full((data.shape[0]), 0.0) - for right in prange(time_window_frms - 1, data.shape[0]): - left = right - time_window_frms + 1 - w_data = data[left : right + 1] + for right in prange(time_window_frms-1, data.shape[0]): + left = right - time_window_frms+1 + w_data = data[left:right+1] corrcfs = np.full((max_frm_lag), np.nan) corrcfs[0] = 1 for shift in range(1, max_frm_lag): @@ -1313,65 +1108,63 @@ def sliding_autocorrelation( const = np.ones_like(corrcfs) mat_[:, 0] = const mat_[:, 1] = corrcfs - det_ = np.linalg.lstsq( - mat_.astype(np.float32), np.arange(0, max_frm_lag).astype(np.float32) - )[0] + det_ = np.linalg.lstsq(mat_.astype(np.float32), np.arange(0, max_frm_lag).astype(np.float32))[0] results[right] = det_[::-1][0] return results @staticmethod - def dominant_frequencies( - data: np.ndarray, - fps: float, - k: int, - window_function: Literal["Hann", "Hamming", "Blackman"] = None, - ): - """Find the K dominant frequencies within a feature vector""" - - if window_function == "Hann": + def dominant_frequencies(data: np.ndarray, + fps: float, + k: int, + window_function: Literal['Hann', 'Hamming', 'Blackman'] = None): + + """ Find the K dominant frequencies within a feature vector """ + + if window_function == 'Hann': data = data * np.hanning(len(data)) - elif window_function == "Hamming": + elif window_function == 'Hamming': data = data * np.hamming(len(data)) - elif window_function == "Blackman": + elif window_function == 'Blackman': data = data * np.blackman(len(data)) fft_result = np.fft.fft(data) frequencies = np.fft.fftfreq(data.shape[0], 1 / fps) magnitude = np.abs(fft_result) - return frequencies[np.argsort(magnitude)[-(k + 1) : -1]] + return frequencies[np.argsort(magnitude)[-(k + 1):-1]] @staticmethod - def sliding_dominant_frequencies( - data: np.ndarray, - fps: float, - k: int, - time_windows: np.ndarray, - window_function: Literal["Hann", "Hamming", "Blackman"] = None, - ): - """Find the K dominant frequencies within a feature vector using sliding windows""" + def sliding_dominant_frequencies(data: np.ndarray, + fps: float, + k: int, + time_windows: np.ndarray, + window_function: Literal['Hann', 'Hamming', 'Blackman'] = None): + + """ Find the K dominant frequencies within a feature vector using sliding windows """ + results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for time_window_cnt in range(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) - for left, right in zip( - range(0, data.shape[0] + 1), range(window_size, data.shape[0] + 1) - ): + for left, right in zip(range(0, data.shape[0] + 1), range(window_size, data.shape[0] + 1)): window_data = data[left:right] - if window_function == "Hann": + if window_function == 'Hann': window_data = window_data * np.hanning(len(window_data)) - elif window_function == "Hamming": + elif window_function == 'Hamming': window_data = window_data * np.hamming(len(window_data)) - elif window_function == "Blackman": + elif window_function == 'Blackman': window_data = window_data * np.blackman(len(window_data)) fft_result = np.fft.fft(window_data) frequencies = np.fft.fftfreq(window_data.shape[0], 1 / fps) magnitude = np.abs(fft_result) - top_k_frequency = frequencies[np.argsort(magnitude)[-(k + 1) : -1]] - results[right - 1][time_window_cnt] = top_k_frequency[0] + top_k_frequency = frequencies[np.argsort(magnitude)[-(k + 1):-1]] + results[right-1][time_window_cnt] = top_k_frequency[0] return results + @staticmethod - @njit("(float32[:], float32[:])") - def kendall_tau(sample_1: np.ndarray, sample_2: np.ndarray) -> Tuple[float, float]: + @njit('(float32[:], float32[:])') + def kendall_tau(sample_1: np.ndarray, + sample_2: np.ndarray) -> Tuple[float, float]: + """ Jitted Kendall Tau (rank correlation coefficient). Non-parametric method for computing correlation between two time-series features. Returns tau and associated z-score. @@ -1393,63 +1186,43 @@ def kendall_tau(sample_1: np.ndarray, sample_2: np.ndarray) -> Tuple[float, floa rnks = np.argsort(sample_1) s1_rnk, s2_rnk = sample_1[rnks], sample_2[rnks] - cncrdnt_cnts, dscrdnt_cnts = np.full((s1_rnk.shape[0] - 1), np.nan), np.full( - (s1_rnk.shape[0] - 1), np.nan - ) + cncrdnt_cnts, dscrdnt_cnts = np.full((s1_rnk.shape[0] - 1), np.nan), np.full((s1_rnk.shape[0] - 1), np.nan) for i in range(s2_rnk.shape[0] - 1): - cncrdnt_cnts[i] = ( - np.argwhere(s2_rnk[i + 1 :] > s2_rnk[i]).flatten().shape[0] - ) - dscrdnt_cnts[i] = ( - np.argwhere(s2_rnk[i + 1 :] < s2_rnk[i]).flatten().shape[0] - ) - t = (np.sum(cncrdnt_cnts) - np.sum(dscrdnt_cnts)) / ( - np.sum(cncrdnt_cnts) + np.sum(dscrdnt_cnts) - ) - z = ( - 3 - * t - * (np.sqrt(s1_rnk.shape[0] * (s1_rnk.shape[0] - 1))) - / np.sqrt(2 * ((2 * s1_rnk.shape[0]) + 5)) - ) + cncrdnt_cnts[i] = np.argwhere(s2_rnk[i + 1:] > s2_rnk[i]).flatten().shape[0] + dscrdnt_cnts[i] = np.argwhere(s2_rnk[i + 1:] < s2_rnk[i]).flatten().shape[0] + t = (np.sum(cncrdnt_cnts) - np.sum(dscrdnt_cnts)) / (np.sum(cncrdnt_cnts) + np.sum(dscrdnt_cnts)) + z = 3 * t * (np.sqrt(s1_rnk.shape[0] * (s1_rnk.shape[0] - 1))) / np.sqrt(2 * ((2 * s1_rnk.shape[0]) + 5)) return t, z + @staticmethod - @njit("(float32[:], float32[:], float64[:], int64)") - def sliding_kendall_tau( - sample_1: np.ndarray, sample_2: np.ndarray, time_windows: np.ndarray, fps: float - ) -> np.ndarray: + @njit('(float32[:], float32[:], float64[:], int64)') + def sliding_kendall_tau(sample_1: np.ndarray, + sample_2: np.ndarray, + time_windows: np.ndarray, + fps: float) -> np.ndarray: + results = np.full((sample_1.shape[0], time_windows.shape[0]), 0.0) for time_window_cnt in range(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) - for left, right in zip( - range(0, sample_1.shape[0] + 1), - range(window_size, sample_1.shape[0] + 1), - ): - sample_1, sample_2 = sample_1[left:right], sample_2[left:right] - rnks = np.argsort(sample_1) - s1_rnk, s2_rnk = sample_1[rnks], sample_2[rnks] - cncrdnt_cnts, dscrdnt_cnts = np.full( - (s1_rnk.shape[0] - 1), np.nan - ), np.full((s1_rnk.shape[0] - 1), np.nan) + for left, right in zip(range(0, sample_1.shape[0] + 1), range(window_size, sample_1.shape[0] + 1)): + sliced_sample_1, sliced_sample_2 = sample_1[left:right], sample_2[left:right] + rnks = np.argsort(sliced_sample_1) + s1_rnk, s2_rnk = sliced_sample_1[rnks], sliced_sample_2[rnks] + cncrdnt_cnts, dscrdnt_cnts = np.full((s1_rnk.shape[0] - 1), np.nan), np.full((s1_rnk.shape[0] - 1), np.nan) for i in range(s2_rnk.shape[0] - 1): - cncrdnt_cnts[i] = ( - np.argwhere(s2_rnk[i + 1 :] > s2_rnk[i]).flatten().shape[0] - ) - dscrdnt_cnts[i] = ( - np.argwhere(s2_rnk[i + 1 :] < s2_rnk[i]).flatten().shape[0] - ) - results[right][time_window_cnt] = ( - np.sum(cncrdnt_cnts) - np.sum(dscrdnt_cnts) - ) / (np.sum(cncrdnt_cnts) + np.sum(dscrdnt_cnts)) + cncrdnt_cnts[i] = np.argwhere(s2_rnk[i + 1:] > s2_rnk[i]).flatten().shape[0] + dscrdnt_cnts[i] = np.argwhere(s2_rnk[i + 1:] < s2_rnk[i]).flatten().shape[0] + results[right][time_window_cnt] = (np.sum(cncrdnt_cnts) - np.sum(dscrdnt_cnts)) / (np.sum(cncrdnt_cnts) + np.sum(dscrdnt_cnts)) return results @staticmethod - def local_outlier_factor( - data: np.ndarray, k: Union[int, float] = 5, contamination: float = 1e-10 - ) -> np.ndarray: + def local_outlier_factor(data: np.ndarray, + k: Union[int, float] = 5, + contamination: float = 1e-10) -> np.ndarray: + """ Compute the local outlier factor of each observation. @@ -1478,9 +1251,7 @@ def local_outlier_factor( @staticmethod @jit(nopython=True) - def _hbos_compute( - data: np.ndarray, histograms: typed.Dict, histogram_edges: typed.Dict - ) -> np.ndarray: + def _hbos_compute(data: np.ndarray, histograms: typed.Dict, histogram_edges: typed.Dict) -> np.ndarray: """ Jitted helper to compute Histogram-based Outlier Score (HBOS) called by ``simba.mixins.statistics_mixin.Statistics.hbos``. @@ -1506,13 +1277,10 @@ def _hbos_compute( results[i] = score return results - def hbos( - self, - data: np.ndarray, - bucket_method: Literal[ - "fd", "doane", "auto", "scott", "stone", "rice", "sturges", "sqrt" - ] = "auto", - ): + def hbos(self, + data: np.ndarray, + bucket_method: Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt'] = 'auto'): + """ Jitted compute of Histogram-based Outlier Scores (HBOS). HBOS quantifies the abnormality of data points based on the densities of their feature values within their respective buckets over all feature values. @@ -1521,9 +1289,8 @@ def hbos( :parameter Literal bucket_method: Estimator determining optimal bucket count and bucket width. Default: The maximum of the Sturges and Freedman-Diaconis estimators. :return np.ndarray: Array of size data.shape[0] representing outlier scores, with higher values representing greater outliers. - .. image:: _static/img/hbos.png - :width: 600 + :width: 800 :align: center :example: @@ -1535,31 +1302,84 @@ def hbos( min_vals, max_vals = np.min(data, axis=0), np.max(data, axis=0) data = (data - min_vals) / (max_vals - min_vals) * (1 - 0) + 0 - histogram_edges = typed.Dict.empty( - key_type=types.int64, value_type=types.float64[:] - ) + histogram_edges = typed.Dict.empty(key_type=types.int64, value_type=types.float64[:]) histograms = typed.Dict.empty(key_type=types.int64, value_type=types.int64[:]) for i in range(data.shape[1]): bin_width, bin_count = bucket_data(data=data, method=bucket_method) - histograms[i] = self._hist_1d( - data=data, - bin_count=bin_count, - range=np.array([0, int(bin_width * bin_count)]), - ) - histogram_edges[i] = np.arange(0, 1 + bin_width, bin_width).astype( - np.float64 - ) - - results = self._hbos_compute( - data=data, histograms=histograms, histogram_edges=histogram_edges - ) + histograms[i] = self._hist_1d(data=data, bin_count=bin_count, range=np.array([0, int(bin_width * bin_count)])) + histogram_edges[i] = np.arange(0, 1 + bin_width, bin_width).astype(np.float64) + + results = self._hbos_compute(data=data, histograms=histograms, histogram_edges=histogram_edges) + return results + + def rolling_shapiro_wilks(self, + data: np.ndarray, + time_window: float, + fps: int) -> np.ndarray: + """ + Compute Shapiro-Wilks normality statistics for sequentially binned values in a time-series. E.g., compute + the normality statistics of ``Feature N`` in each window of ``time_window`` seconds. + + :parameter ndarray data: 1D array of size len(frames) representing feature values. + :parameter int time_window: The size of the buckets in seconds. + :parameter int fps: Frame-rate of recorded video. + :return np.ndarray: Array of size data.shape[0] with Shapiro-Wilks normality statistics + + :example: + >>> data = np.random.randint(low=0, high=100, size=(200)).astype('float32') + >>> results = self.rolling_shapiro_wilks(data=data, time_window=1, fps=30) + """ + + window_size, results = int(time_window * fps), np.full((data.shape[0]), -1.0) + data = np.split(data, list(range(window_size, data.shape[0], window_size))) + for cnt, i in enumerate(prange(1, len(data))): + start, end = int((cnt + 1) * window_size), int(((cnt + 1) * window_size) + window_size) + results[start:end] = stats.shapiro(data[i])[0] return results -sample_1 = np.random.random_integers(low=1, high=2, size=(10, 50)).astype(np.float64) -sample_2 = np.random.random_integers(low=7, high=20, size=(2, 50)).astype(np.float64) -data = np.vstack([sample_1, sample_2]) -Statistics().hbos(data=data) + + +# data_sizes = [5, 10, 100, 1000, 10000] +# runs = 4 +# +# # data_sizes = [1] +# # runs = 1 +# import pickle +# for i in range(1, runs+1): +# print(i) +# for j in data_sizes: +# data = np.random.random_integers(0, 100, (j, 400)) +# start = time.time() +# results = Statistics().hbos(data=data) +# print(time.time() - start) +# +# + + + +# for i in data_sizes: +# +# data = np.hstack([data_1, data_2]) + + + + +# + + + + + + + + + + +# sample_1 = np.random.random_integers(low=1, high=2, size=(10, 50)).astype(np.float64) +# sample_2 = np.random.random_integers(low=7, high=20, size=(2, 50)).astype(np.float64) +# data = np.vstack([sample_1, sample_2]) +# Statistics().hbos(data=data) # @staticmethod # def polyfit(data:np.ndarray, @@ -1572,6 +1392,10 @@ def hbos( # y = Statistics().polyfit(data=data, deg=1) + + + + # # @jit(nopython=True) # # def fft(data: np.ndarray): # # y = np.full(1, dtype=np.complex128, fill_value=np.nan) @@ -1597,6 +1421,11 @@ def hbos( # print(time.time() - start) + + + + + # # # Statistics().rolling_one_way_anova(data=data, time_windows=np.array([0.5]), fps=10) @@ -1607,7 +1436,7 @@ def hbos( # test = Statistics().rolling_independent_sample_t(data=data, time_window=1, fps=10) -# sm.tsa.stattools.adfuller(data, maxlag=None, regression='c', autolag='AIC', store=False, regresults=False) +#sm.tsa.stattools.adfuller(data, maxlag=None, regression='c', autolag='AIC', store=False, regresults=False) # sm.tsa.stattools.zivot_andrews(data, maxlag=None, regression='c', autolag=None) # # @@ -1616,11 +1445,11 @@ def hbos( # start = time.time() # sample_1 = np.array([7, 2, 9, 4, 5, 6, 7, 8, 9]).astype(np.float32) -# data = np.random.randint(0, 4, (200)).astype(np.float32) +#data = np.random.randint(0, 4, (200)).astype(np.float32) # Statistics().pearsons_r(sample_1=sample_1, sample_2=sample_2) # print(time.time() - start) -# results = Statistics().rolling_mann_whitney(data=data, time_windows=np.array([.5]), fps=2.0) +#results = Statistics().rolling_mann_whitney(data=data, time_windows=np.array([.5]), fps=2.0) # import pickle @@ -1647,6 +1476,7 @@ def hbos( # print(time.time() - start) + # sample_1 = np.array([5, 2, 3, 4, 5.2]) # sample_2 = np.array([2, 3, 4.1, 5, 6.1]) # @@ -1674,10 +1504,10 @@ def hbos( # result = Statistics().levenes(sample_1=sample_1, sample_2=sample_2, critical_values=critical_values) # print(result) -# 12.63909108903254 +#12.63909108903254 -# stats.levene(sample_1, sample_2) +#stats.levene(sample_1, sample_2) # sample_1 = np.array([7, 2, 9, 4, 5, 6, 7, 8, 9]).astype(np.float64) # sample_2 = np.array([1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5]).astype(np.float64) @@ -1708,7 +1538,7 @@ def hbos( # data = np.random.randint(0, 50, (20)).astype(np.float64) -# critical_values = pickle.load(open( "/Users/simon/Desktop/envs/simba_dev/simba/assets/lookups/critical_values_05.pickle","rb")) #['independent_t_test']['one_tail'].values +#critical_values = pickle.load(open( "/Users/simon/Desktop/envs/simba_dev/simba/assets/lookups/critical_values_05.pickle","rb")) #['independent_t_test']['one_tail'].values # test = Statistics().sliding_independent_samples_t(data=data, time_window=1, fps=2, critical_values=critical_values, slide_time=1) # data = np.random.randint(0, 5, (200)).astype(np.float64) @@ -1725,3 +1555,5 @@ def hbos( # for i in range(1000000): # test = Statistics().levenes(sample_1=sample_1, sample_2=sample_2) # print(time.time() - start) + +