From eb82c4cbe64148ea98c0625f79835a08dbd394d4 Mon Sep 17 00:00:00 2001 From: Simon Nilsson Date: Fri, 13 Oct 2023 09:29:19 -0400 Subject: [PATCH] Add files via upload --- simba/mixins/timeseries_features_mixin.py | 277 +++++++++++++--------- 1 file changed, 159 insertions(+), 118 deletions(-) diff --git a/simba/mixins/timeseries_features_mixin.py b/simba/mixins/timeseries_features_mixin.py index fec431190..5c8a7017c 100644 --- a/simba/mixins/timeseries_features_mixin.py +++ b/simba/mixins/timeseries_features_mixin.py @@ -1,19 +1,28 @@ -import numpy as np -from numba import njit, prange, types +from numba import njit, prange, types, typed from numba.typed import List - +import numpy as np +try: + from typing import Literal +except: + from typing_extensions import Literal class TimeseriesFeatureMixin(object): """ - Time-series methods focusing on signal complexity in sliding windows. + Time-series methods focused on signal complexity in sliding windows. + + References + ---------- + .. [1] `cesium `_. + .. [2] `eeglib `_. + .. [3] `antropy `_. """ def __init__(self): pass @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def hjort_parameters(data: np.ndarray): """ Jitted compute of Hjorth parameters for a given time series data. Hjorth parameters describe @@ -54,10 +63,10 @@ def diff(x): return activity, mobility, complexity @staticmethod - @njit("(float32[:], float64[:], int64)") - def sliding_hjort_parameters( - data: np.ndarray, window_sizes: np.ndarray, sample_rate: int - ) -> np.ndarray: + @njit('(float32[:], float64[:], int64)') + def sliding_hjort_parameters(data: np.ndarray, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: """ Jitted compute of Hjorth parameters, including mobility, complexity, and activity, for sliding windows of varying sizes applied to the input data array. @@ -74,9 +83,7 @@ def sliding_hjort_parameters( results = np.full((3, data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip( - prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) - ): + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): sample = data[l:r] dx = sample[1:] - sample[:-1] ddx = dx[1:] - dx[:-1] @@ -92,7 +99,7 @@ def sliding_hjort_parameters( return results.astype(np.float32) @staticmethod - @njit("(float32[:], boolean)") + @njit('(float32[:], boolean)') def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray: """ Jitted compute of the local maxima or minima defined as values which are higher or lower than immediately preceding and proceeding time-series neighbors, repectively. @@ -114,7 +121,6 @@ def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray: :align: center """ - if not maxima: data = -data results = np.full((data.shape[0], 2), -1.0) @@ -138,7 +144,7 @@ def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray: return results[np.argwhere(results[:, 0].T != -1).flatten()] @staticmethod - @njit("(float32[:], float64)") + @njit('(float32[:], float64)') def crossings(data: np.ndarray, val: float) -> int: """ Jitted compute of the count in time-series where sequential values crosses a defined value. @@ -171,10 +177,11 @@ def crossings(data: np.ndarray, val: float) -> int: return cnt @staticmethod - @njit("(float32[:], float64, float64[:], int64,)") - def sliding_crossings( - data: np.ndarray, val: float, window_sizes: np.ndarray, sample_rate: int - ) -> np.ndarray: + @njit('(float32[:], float64, float64[:], int64,)') + def sliding_crossings(data: np.ndarray, + val: float, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: """ Compute the number of crossings over sliding windows in a data array. @@ -191,9 +198,7 @@ def sliding_crossings( results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip( - prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) - ): + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): sample = data[l:r] cnt, last_val = 0, -1 if sample[0] > val: @@ -210,10 +215,8 @@ def sliding_crossings( return results.astype(np.int32) @staticmethod - @njit("(float32[:], int64, int64, )", cache=True, fastmath=True) - def percentile_difference( - data: np.ndarray, upper_pct: int, lower_pct: int - ) -> float: + @njit('(float32[:], int64, int64, )', cache=True, fastmath=True) + def percentile_difference(data: np.ndarray, upper_pct: int, lower_pct: int) -> float: """ Jitted compute of the difference between the ``upper`` and ``lower`` percentiles of the data as a percentage of the median value. @@ -237,20 +240,16 @@ def percentile_difference( """ - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( - data, lower_pct - ) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) return np.abs(upper_val - lower_val) / np.median(data) @staticmethod - @njit("(float32[:], int64, int64, float64[:], int64, )", cache=True, fastmath=True) - def sliding_percentile_difference( - data: np.ndarray, - upper_pct: int, - lower_pct: int, - window_sizes: np.ndarray, - sample_rate: int, - ) -> np.ndarray: + @njit('(float32[:], int64, int64, float64[:], int64, )', cache=True, fastmath=True) + def sliding_percentile_difference(data: np.ndarray, + upper_pct: int, + lower_pct: int, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: """ Jitted computes the difference between the upper and lower percentiles within a sliding window for each position in the time series using various window sizes. It returns a 2D array where each row corresponds to a position in the time series, @@ -267,13 +266,9 @@ def sliding_percentile_difference( results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip( - prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) - ): + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): sample = data[l:r] - upper_val, lower_val = np.percentile(sample, upper_pct), np.percentile( - sample, lower_pct - ) + upper_val, lower_val = np.percentile(sample, upper_pct), np.percentile(sample, lower_pct) median = np.median(sample) if median != 0: results[r - 1, i] = np.abs(upper_val - lower_val) / median @@ -283,7 +278,7 @@ def sliding_percentile_difference( return results.astype(np.float32) @staticmethod - @njit("(float32[:], float64,)", cache=True, fastmath=True) + @njit('(float32[:], float64,)', cache=True, fastmath=True) def percent_beyond_n_std(data: np.ndarray, n: float) -> float: """ Jitted compute of the ratio of values in time-series more than N standard deviations from the mean of the time-series. @@ -310,10 +305,12 @@ def percent_beyond_n_std(data: np.ndarray, n: float) -> float: return np.argwhere(np.abs(data) > target).shape[0] / data.shape[0] @staticmethod - @njit("(float32[:], float64, float64[:], int64,)", cache=True, fastmath=True) - def sliding_percent_beyond_n_std( - data: np.ndarray, n: float, window_sizes: np.ndarray, sample_rate: int - ) -> np.ndarray: + @njit('(float32[:], float64, float64[:], int64,)', cache=True, fastmath=True) + def sliding_percent_beyond_n_std(data: np.ndarray, + n: float, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: + """ Computed the percentage of data points that exceed 'n' standard deviations from the mean for each position in the time series using various window sizes. It returns a 2D array where each row corresponds to a position in the time series, @@ -330,18 +327,14 @@ def sliding_percent_beyond_n_std( target = (np.std(data) * n) + np.mean(data) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip( - prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) - ): + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): sample = data[l:r] - results[r - 1, i] = ( - np.argwhere(np.abs(sample) > target).shape[0] / sample.shape[0] - ) + results[r - 1, i] = np.argwhere(np.abs(sample) > target).shape[0] / sample.shape[0] return results.astype(np.float32) @staticmethod - @njit("(float32[:], int64, int64, )", cache=True, fastmath=True) + @njit('(float32[:], int64, int64, )', cache=True, fastmath=True) def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: int): """ Jitted compute of the ratio of values in time-series that fall between the ``upper`` and ``lower`` percentile. @@ -365,23 +358,16 @@ def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: in :align: center """ - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( - data, lower_pct - ) - return ( - np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0] - / data.shape[0] - ) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) + return np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0] / data.shape[0] @staticmethod - @njit("(float32[:], int64, int64, float64[:], int64)", cache=True, fastmath=True) - def sliding_percent_in_percentile_window( - data: np.ndarray, - upper_pct: int, - lower_pct: int, - window_sizes: np.ndarray, - sample_rate: int, - ): + @njit('(float32[:], int64, int64, float64[:], int64)', cache=True, fastmath=True) + def sliding_percent_in_percentile_window(data: np.ndarray, + upper_pct: int, + lower_pct: int, + window_sizes: np.ndarray, + sample_rate: int): """ Jitted compute of the percentage of data points falling within a percentile window in a sliding manner. @@ -398,26 +384,17 @@ def sliding_percent_in_percentile_window( """ results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( - data, lower_pct - ) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip( - prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) - ): + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): sample = data[l:r] - results[r - 1, i] = ( - np.argwhere((sample <= upper_val) & (sample >= lower_val)) - .flatten() - .shape[0] - / sample.shape[0] - ) + results[r - 1, i] = np.argwhere((sample <= upper_val) & (sample >= lower_val)).flatten().shape[0] / sample.shape[0] return results.astype(np.float32) @staticmethod - @njit("(float32[:],)", fastmath=True, cache=True) + @njit('(float32[:],)', fastmath=True, cache=True) def petrosian_fractal_dimension(data: np.ndarray) -> float: """ Calculate the Petrosian Fractal Dimension (PFD) of a given time series data. The PFD is a measure of the @@ -461,16 +438,13 @@ def petrosian_fractal_dimension(data: np.ndarray) -> float: if zC == 0: return -1.0 - return np.log10(data.shape[0]) / ( - np.log10(data.shape[0]) - + np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC)) - ) + return np.log10(data.shape[0]) / (np.log10(data.shape[0]) + np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC))) @staticmethod - @njit("(float32[:], float64[:], int64)", fastmath=True, cache=True) - def sliding_petrosian_fractal_dimension( - data: np.ndarray, window_sizes: np.ndarray, sample_rate: int - ) -> np.ndarray: + @njit('(float32[:], float64[:], int64)', fastmath=True, cache=True) + def sliding_petrosian_fractal_dimension(data: np.ndarray, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: """ Jitted compute of Petrosian Fractal Dimension over sliding windows in a data array. @@ -488,12 +462,8 @@ def sliding_petrosian_fractal_dimension( results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip( - prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) - ): - sample = (data[l:r] - np.min(data[l:r])) / ( - np.max(data[l:r]) - np.min(data[l:r]) - ) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = (data[l:r] - np.min(data[l:r])) / (np.max(data[l:r]) - np.min(data[l:r])) derivative = sample[1:] - sample[:-1] if derivative.shape[0] == 0: results[r - 1, i] = -1.0 @@ -511,15 +481,13 @@ def sliding_petrosian_fractal_dimension( if zC == 0: results[r - 1, i] = -1.0 else: - results[r - 1, i] = np.log10(sample.shape[0]) / ( - np.log10(sample.shape[0]) - + np.log10(sample.shape[0] / (sample.shape[0] + 0.4 * zC)) - ) + results[r - 1, i] = np.log10(sample.shape[0]) / (np.log10(sample.shape[0]) + np.log10( + sample.shape[0] / (sample.shape[0] + 0.4 * zC))) return results.astype(np.float32) @staticmethod - @njit("(float32[:], int64)") + @njit('(float32[:], int64)') def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): """ Jitted compute of the Higuchi Fractal Dimension of a given time series data. The Higuchi Fractal Dimension provides a measure of the fractal @@ -550,12 +518,8 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): """ L, N = np.zeros(kmax - 1), len(data) - x = np.hstack( - ( - -np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32), - np.ones(kmax - 1).reshape(-1, 1).astype(np.float32), - ) - ) + x = np.hstack((-np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32), + np.ones(kmax - 1).reshape(-1, 1).astype(np.float32))) for k in prange(2, kmax + 1): Lk = np.zeros(k) for m in range(0, k): @@ -570,8 +534,9 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): return np.linalg.lstsq(x, L.astype(np.float32))[0][0] @staticmethod - @njit("(float32[:], int64, int64,)", fastmath=True) + @njit('(float32[:], int64, int64,)', fastmath=True) def permutation_entropy(data: np.ndarray, dimension: int, delay: int) -> float: + """ Calculate the permutation entropy of a time series. @@ -633,7 +598,7 @@ def permutation_entropy(data: np.ndarray, dimension: int, delay: int) -> float: return -np.sum(probs * np.log(probs)) @staticmethod - @njit("(float32[:],)", fastmath=True) + @njit('(float32[:],)', fastmath=True) def line_length(data: np.ndarray) -> float: """ Calculate the line length of a 1D array. @@ -664,10 +629,11 @@ def line_length(data: np.ndarray) -> float: return np.sum(diff[1:]) @staticmethod - @njit("(float32[:], float64[:], int64)", fastmath=True) - def sliding_line_length( - data: np.ndarray, window_sizes: np.ndarray, sample_rate: int - ) -> np.ndarray: + @njit('(float32[:], float64[:], int64)', fastmath=True) + def sliding_line_length(data: np.ndarray, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: + """ Jitted compute of sliding line length for a given time series using different window sizes. @@ -688,13 +654,88 @@ def sliding_line_length( results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) for i in prange(window_sizes.shape[0]): window_size = int(window_sizes[i] * sample_rate) - for l, r in zip( - prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1) - ): + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): sample = data[l:r] results[r - 1, i] = np.sum(np.abs(np.diff(sample.astype(np.float64)))) return results.astype(np.float32) + + @staticmethod + @njit('(float32[:], float64[:], int64)', fastmath=True, cache=True) + def sliding_variance(data: np.ndarray, + window_sizes: np.ndarray, + sample_rate: int): + """ + Jitted compute of the variance of data within sliding windows of varying sizes applied to + the input data array. Variance is a measure of data dispersion or spread. + + :param data: 1d input data array. + :param window_sizes: Array of window sizes (in seconds). + :param sample_rate: Sampling rate of the data in samples per second. + :return: Variance values for each window size and data point. The shape of the result array is (data.shape[0], window_sizes.shape[0]). + """ + + results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) + for i in prange(window_sizes.shape[0]): + window_size = int(window_sizes[i] * sample_rate) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = (data[l:r] - np.min(data[l:r])) / (np.max(data[l:r]) - np.min(data[l:r])) + results[r - 1, i] = np.var(sample) + return results.astype(np.float32) + + @staticmethod + @njit('(float32[:], float64[:], int64, types.ListType(types.unicode_type))', fastmath=True, cache=True) + def sliding_descriptive_statistics(data: np.ndarray, + window_sizes: np.ndarray, + sample_rate: int, + statistics: Literal['var', 'max', 'min', 'std', 'median', 'mean', 'mad']): + + """ + Jitted compute of descriptive statistics over sliding windows in 1D data array. + + Computes various descriptive statistics (e.g., variance, maximum, minimum, standard deviation, + median, mean, median absolute deviation) for sliding windows of varying sizes applied to the input data array. + + :param np.ndarray data : 1D input data array. + :param np.ndarray window_sizes: Array of window sizes (in seconds). + :param int sample_rate: Sampling rate of the data in samples per second. + :param types.ListType(types.unicode_type) statistics:: List of statistics to compute. Options: 'var', 'max', 'min', 'std', 'median', 'mean', 'mad'. + :return np.ndarray Array containing the selected descriptive statistics for each window size, data point, and statistic type. The shape of the result array is (len(statistics), data.shape[0], window_sizes.shape[0). + + .. note:: + - The `statistics` parameter should be a list containing one or more of the following statistics: + 'var' (variance), 'max' (maximum), 'min' (minimum), 'std' (standard deviation), 'median' (median), + 'mean' (mean), 'mad' (median absolute deviation). + - If the statistics list is ['var', 'max', 'mean'], the + 3rd dimension order in the result array will be: [variance, maximum, mean] + + :example: + >>> data = np.array([1, 4, 2, 3, 5, 6, 8, 7, 9, 10]).astype(np.float32) + >>> results = TimeseriesFeatureMixin().sliding_descriptive_statistics(data=data, window_sizes=np.array([1.0, 5.0]), sample_rate=2, statistics=typed.List(['var', 'max'])) + """ + + results = np.full((len(statistics), data.shape[0], window_sizes.shape[0]), -1.0) + for j in prange(len(statistics)): + for i in prange(window_sizes.shape[0]): + window_size = int(window_sizes[i] * sample_rate) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = data[l:r] + if statistics[j] == 'var': + results[j, r - 1, i] = np.var(sample) + elif statistics[j] == 'max': + results[j, r - 1, i] = np.max(sample) + elif statistics[j] == 'min': + results[j, r - 1, i] = np.min(sample) + elif statistics[j] == 'std': + results[j, r - 1, i] = np.std(sample) + elif statistics[j] == 'median': + results[j, r - 1, i] = np.median(sample) + elif statistics[j] == 'mean': + results[j, r - 1, i] = np.mean(sample) + elif statistics[j] == 'mad': + results[j, r - 1, i] = np.median(np.abs(sample - np.median(sample))) + # + return results.astype(np.float32) # t = np.linspace(0, 50, int(44100 * 2.0), endpoint=False) # sine_wave = 1.0 * np.sin(2 * np.pi * 1.0 * t).astype(np.float32) @@ -702,4 +743,4 @@ def sliding_line_length( # #1.0000398187022719 # np.random.shuffle(sine_wave) # TimeseriesFeatureMixin().petrosian_fractal_dimension(data=sine_wave) -# #1.0211625348743218 +# #1.0211625348743218 \ No newline at end of file