diff --git a/simba/mixins/timeseries_features_mixin.py b/simba/mixins/timeseries_features_mixin.py index d08fd01b2..589c0fd5f 100644 --- a/simba/mixins/timeseries_features_mixin.py +++ b/simba/mixins/timeseries_features_mixin.py @@ -1,19 +1,18 @@ -import numpy as np from numba import njit, prange, types from numba.typed import List - +import numpy as np class TimeseriesFeatureMixin(object): """ - Time-series methods. + Time-series methods focusing on signal complexity in sliding windows. """ def __init__(self): pass @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def hjort_parameters(data: np.ndarray): """ Jitted compute of Hjorth parameters for a given time series data. Hjorth parameters describe @@ -54,7 +53,43 @@ def diff(x): return activity, mobility, complexity @staticmethod - @njit("(float32[:], boolean)") + @njit('(float32[:], float64[:], int64)') + def sliding_hjort_parameters(data: np.ndarray, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: + """ + Jitted compute of Hjorth parameters, including mobility, complexity, and activity, for + sliding windows of varying sizes applied to the input data array. + + :param np.ndarray data: Input data array. + :param np.ndarray window_sizes: Array of window sizes (in seconds). + :param int sample_rate: Sampling rate of the data in samples per second. + :return np.ndarray: An array containing Hjorth parameters for each window size and data point. + The shape of the result array is (3, data.shape[0], window_sizes.shape[0]). + The three parameters are stored in the first dimension (0 - mobility, 1 - complexity, + 2 - activity), and the remaining dimensions correspond to data points and window sizes. + + """ + results = np.full((3, data.shape[0], window_sizes.shape[0]), -1.0) + for i in prange(window_sizes.shape[0]): + window_size = int(window_sizes[i] * sample_rate) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = data[l:r] + dx = sample[1:] - sample[:-1] + ddx = dx[1:] - dx[:-1] + x_var, dx_var = np.var(sample), np.var(dx) + ddx_var = np.var(ddx) + mobility = np.sqrt(dx_var / x_var) + complexity = np.sqrt(ddx_var / dx_var) / mobility + activity = np.var(sample) + results[0, r + 1, i] = mobility + results[1, r + 1, i] = complexity + results[2, r + 1, i] = activity + + return results.astype(np.float32) + + @staticmethod + @njit('(float32[:], boolean)') def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray: """ Jitted compute of the local maxima or minima defined as values which are higher or lower than immediately preceding and proceeding time-series neighbors, repectively. @@ -100,7 +135,7 @@ def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray: return results[np.argwhere(results[:, 0].T != -1).flatten()] @staticmethod - @njit("(float32[:], float64)") + @njit('(float32[:], float64)') def crossings(data: np.ndarray, val: float) -> int: """ Jitted compute of the count in time-series where sequential values crosses a defined value. @@ -133,10 +168,46 @@ def crossings(data: np.ndarray, val: float) -> int: return cnt @staticmethod - @njit("(float32[:], int64, int64, )", cache=True, fastmath=True) - def percentile_difference( - data: np.ndarray, upper_pct: int, lower_pct: int - ) -> float: + @njit('(float32[:], float64, float64[:], int64,)') + def sliding_crossings(data: np.ndarray, + val: float, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: + """ + Compute the number of crossings over sliding windows in a data array. + + Computes the number of times a value in the data array crosses a given threshold + value within sliding windows of varying sizes. The number of crossings is computed for each + window size and stored in the result array. + + :param np.ndarray data: Input data array. + :param float val: Threshold value for crossings. + :param np.ndarray window_sizes: Array of window sizes (in seconds). + :param int sample_rate: Sampling rate of the data in samples per second. + :return np.ndarray: An array containing the number of crossings for each window size and data point. The shape of the result array is (data.shape[0], window_sizes.shape[0]). + """ + results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) + for i in prange(window_sizes.shape[0]): + window_size = int(window_sizes[i] * sample_rate) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = data[l:r] + cnt, last_val = 0, -1 + if sample[0] > val: + last_val = 1 + for j in prange(1, sample.shape[0]): + current_val = -1 + if sample[j] > val: + current_val = 1 + if last_val != current_val: + cnt += 1 + last_val = current_val + results[r - 1, i] = cnt + + return results.astype(np.int32) + + @staticmethod + @njit('(float32[:], int64, int64, )', cache=True, fastmath=True) + def percentile_difference(data: np.ndarray, upper_pct: int, lower_pct: int) -> float: """ Jitted compute of the difference between the ``upper`` and ``lower`` percentiles of the data as a percentage of the median value. @@ -160,13 +231,45 @@ def percentile_difference( """ - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( - data, lower_pct - ) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) return np.abs(upper_val - lower_val) / np.median(data) @staticmethod - @njit("(float32[:], float64,)", cache=True, fastmath=True) + @njit('(float32[:], int64, int64, float64[:], int64, )', cache=True, fastmath=True) + def sliding_percentile_difference(data: np.ndarray, + upper_pct: int, + lower_pct: int, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: + """ + Jitted computes the difference between the upper and lower percentiles within a sliding window for each position + in the time series using various window sizes. It returns a 2D array where each row corresponds to a position in the time series, + and each column corresponds to a different window size. The results are calculated as the absolute difference between + upper and lower percentiles divided by the median of the window. + + :param np.ndarray data: The input time series data. + :param int upper_pct: The upper percentile value for the window (e.g., 95 for the 95th percentile). + :param int lower_pct: The lower percentile value for the window (e.g., 5 for the 5th percentile). + :param np.ndarray window_sizes: An array of window sizes (in seconds) to use for the sliding calculation. + :param int sample_rate: The sampling rate (samples per second) of the time series data. + :return np.ndarray: A 2D array containing the difference between upper and lower percentiles for each window size. + """ + results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) + for i in prange(window_sizes.shape[0]): + window_size = int(window_sizes[i] * sample_rate) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = data[l:r] + upper_val, lower_val = np.percentile(sample, upper_pct), np.percentile(sample, lower_pct) + median = np.median(sample) + if median != 0: + results[r - 1, i] = np.abs(upper_val - lower_val) / median + else: + results[r - 1, i] = -1.0 + + return results.astype(np.float32) + + @staticmethod + @njit('(float32[:], float64,)', cache=True, fastmath=True) def percent_beyond_n_std(data: np.ndarray, n: float) -> float: """ Jitted compute of the ratio of values in time-series more than N standard deviations from the mean of the time-series. @@ -190,11 +293,39 @@ def percent_beyond_n_std(data: np.ndarray, n: float) -> float: """ target = (np.std(data) * n) + np.mean(data) - print(target, np.mean(data), target / 2) return np.argwhere(np.abs(data) > target).shape[0] / data.shape[0] @staticmethod - @njit("(float32[:], int64, int64, )", cache=True, fastmath=True) + @njit('(float32[:], float64, float64[:], int64,)', cache=True, fastmath=True) + def sliding_percent_beyond_n_std(data: np.ndarray, + n: float, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: + + """ + Computed the percentage of data points that exceed 'n' standard deviations from the mean for each position in + the time series using various window sizes. It returns a 2D array where each row corresponds to a position in the time series, + and each column corresponds to a different window size. The results are given as a percentage of data points beyond the threshold. + + :param np.ndarray data: The input time series data. + :param float n: The number of standard deviations to determine the threshold. + :param np.ndarray window_sizes: An array of window sizes (in seconds) to use for the sliding calculation. + :param int sample_rate: The sampling rate (samples per second) of the time series data. + :return np.ndarray: A 2D array containing the percentage of data points beyond the specified 'n' standard deviations for each window size. + """ + + results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) + target = (np.std(data) * n) + np.mean(data) + for i in prange(window_sizes.shape[0]): + window_size = int(window_sizes[i] * sample_rate) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = data[l:r] + results[r - 1, i] = np.argwhere(np.abs(sample) > target).shape[0] / sample.shape[0] + + return results.astype(np.float32) + + @staticmethod + @njit('(float32[:], int64, int64, )', cache=True, fastmath=True) def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: int): """ Jitted compute of the ratio of values in time-series that fall between the ``upper`` and ``lower`` percentile. @@ -218,16 +349,43 @@ def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: in :align: center """ - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( - data, lower_pct - ) - return ( - np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0] - / data.shape[0] - ) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) + return np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0] / data.shape[0] @staticmethod - @njit("(float32[:],)", fastmath=True, cache=True) + @njit('(float32[:], int64, int64, float64[:], int64)', cache=True, fastmath=True) + def sliding_percent_in_percentile_window(data: np.ndarray, + upper_pct: int, + lower_pct: int, + window_sizes: np.ndarray, + sample_rate: int): + """ + Jitted compute of the percentage of data points falling within a percentile window in a sliding manner. + + The function computes the percentage of data points within the specified percentile window for each position in the time series + using various window sizes. It returns a 2D array where each row corresponds to a position in the time series, and each column + corresponds to a different window size. The results are given as a percentage of data points within the percentile window. + + :param np.ndarray data : The input time series data. + :param int upper_pct: The upper percentile value for the window (e.g., 95 for the 95th percentile). + :param int lower_pct (int): The lower percentile value for the window (e.g., 5 for the 5th percentile). + :param np.ndarray window_sizes: An array of window sizes (in seconds) to use for the sliding calculation. + :param int sample_rate: The sampling rate (samples per second) of the time series data. + :return np.ndarray: A 2D array containing the percentage of data points within the percentile window for each window size. + + """ + results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) + for i in prange(window_sizes.shape[0]): + window_size = int(window_sizes[i] * sample_rate) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = data[l:r] + results[r - 1, i] = np.argwhere((sample <= upper_val) & (sample >= lower_val)).flatten().shape[0] / sample.shape[0] + + return results.astype(np.float32) + + @staticmethod + @njit('(float32[:],)', fastmath=True, cache=True) def petrosian_fractal_dimension(data: np.ndarray) -> float: """ Calculate the Petrosian Fractal Dimension (PFD) of a given time series data. The PFD is a measure of the @@ -271,13 +429,56 @@ def petrosian_fractal_dimension(data: np.ndarray) -> float: if zC == 0: return -1.0 - return np.log10(data.shape[0]) / ( - np.log10(data.shape[0]) - + np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC)) - ) + return np.log10(data.shape[0]) / (np.log10(data.shape[0]) + np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC))) @staticmethod - @njit("(float32[:], int64)") + @njit('(float32[:], float64[:], int64)', fastmath=True, cache=True) + def sliding_petrosian_fractal_dimension(data: np.ndarray, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: + """ + Jitted compute of Petrosian Fractal Dimension over sliding windows in a data array. + + This method computes the Petrosian Fractal Dimension for sliding windows of varying sizes applied + to the input data array. The Petrosian Fractal Dimension is a measure of signal complexity. + + :param np.ndarray data: Input data array. + :param np.ndarray window_sizes: Array of window sizes (in seconds). + :param int sample_rate: Sampling rate of the data in samples per second. + :return np.ndarray: An array containing Petrosian Fractal Dimension values for each window size and data + point. The shape of the result array is (data.shape[0], window_sizes.shape[0]). + + """ + + results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) + for i in prange(window_sizes.shape[0]): + window_size = int(window_sizes[i] * sample_rate) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = (data[l:r] - np.min(data[l:r])) / (np.max(data[l:r]) - np.min(data[l:r])) + derivative = sample[1:] - sample[:-1] + if derivative.shape[0] == 0: + results[r - 1, i] = -1.0 + else: + zC, last_val = 0, -1 + if derivative[0] > 0.0: + last_val = 1 + for j in prange(1, derivative.shape[0]): + current_val = -1 + if derivative[j] > 0.0: + current_val = 1 + if last_val != current_val: + zC += 1 + last_val = current_val + if zC == 0: + results[r - 1, i] = -1.0 + else: + results[r - 1, i] = np.log10(sample.shape[0]) / (np.log10(sample.shape[0]) + np.log10( + sample.shape[0] / (sample.shape[0] + 0.4 * zC))) + + return results.astype(np.float32) + + @staticmethod + @njit('(float32[:], int64)') def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): """ Jitted compute of the Higuchi Fractal Dimension of a given time series data. The Higuchi Fractal Dimension provides a measure of the fractal @@ -308,12 +509,8 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): """ L, N = np.zeros(kmax - 1), len(data) - x = np.hstack( - ( - -np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32), - np.ones(kmax - 1).reshape(-1, 1).astype(np.float32), - ) - ) + x = np.hstack((-np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32), + np.ones(kmax - 1).reshape(-1, 1).astype(np.float32))) for k in prange(2, kmax + 1): Lk = np.zeros(k) for m in range(0, k): @@ -328,8 +525,9 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): return np.linalg.lstsq(x, L.astype(np.float32))[0][0] @staticmethod - @njit("(float32[:], int64, int64,)", fastmath=True) + @njit('(float32[:], int64, int64,)', fastmath=True) def permutation_entropy(data: np.ndarray, dimension: int, delay: int) -> float: + """ Calculate the permutation entropy of a time series. @@ -391,7 +589,7 @@ def permutation_entropy(data: np.ndarray, dimension: int, delay: int) -> float: return -np.sum(probs * np.log(probs)) @staticmethod - @njit("(float32[:],)", fastmath=True) + @njit('(float32[:],)', fastmath=True) def line_length(data: np.ndarray) -> float: """ Calculate the line length of a 1D array. @@ -421,6 +619,37 @@ def line_length(data: np.ndarray) -> float: diff = np.abs(np.diff(data.astype(np.float64))) return np.sum(diff[1:]) + @staticmethod + @njit('(float32[:], float64[:], int64)', fastmath=True) + def sliding_line_length(data: np.ndarray, + window_sizes: np.ndarray, + sample_rate: int) -> np.ndarray: + + """ + Jitted compute of sliding line length for a given time series using different window sizes. + + The function computes line length for the input data using various window sizes. It returns a 2D array where each row + corresponds to a position in the time series, and each column corresponds to a different window size. The line length + is calculated for each window, and the results are returned as a 2D array of float32 values. + + :param np.ndarray data: 1D array input data. + :param window_sizes: An array of window sizes (in seconds) to use for line length calculation. + :param sample_rate: The sampling rate (samples per second) of the time series data. + :return np.ndarray: A 2D array containing line length values for each window size at each position in the time series. + + :examples: + >>> data = np.array([1, 4, 2, 3, 5, 6, 8, 7, 9, 10]).astype(np.float32) + >>> TimeseriesFeatureMixin().sliding_line_length(data=data, window_sizes=np.array([1.0]), sample_rate=2) + """ + + results = np.full((data.shape[0], window_sizes.shape[0]), -1.0) + for i in prange(window_sizes.shape[0]): + window_size = int(window_sizes[i] * sample_rate) + for l, r in zip(prange(0, data.shape[0] + 1), prange(window_size, data.shape[0] + 1)): + sample = data[l:r] + results[r - 1, i] = np.sum(np.abs(np.diff(sample.astype(np.float64)))) + return results.astype(np.float32) + # t = np.linspace(0, 50, int(44100 * 2.0), endpoint=False) # sine_wave = 1.0 * np.sin(2 * np.pi * 1.0 * t).astype(np.float32) @@ -428,4 +657,4 @@ def line_length(data: np.ndarray) -> float: # #1.0000398187022719 # np.random.shuffle(sine_wave) # TimeseriesFeatureMixin().petrosian_fractal_dimension(data=sine_wave) -# #1.0211625348743218 +# #1.0211625348743218 \ No newline at end of file