diff --git a/simba/mixins/timeseries_features_mixin.py b/simba/mixins/timeseries_features_mixin.py index 1e8f3ee4e..86be41578 100644 --- a/simba/mixins/timeseries_features_mixin.py +++ b/simba/mixins/timeseries_features_mixin.py @@ -1,6 +1,6 @@ +from numba import njit, prange, types +from numba.typed import List import numpy as np -from numba import njit, prange - class TimeseriesFeatureMixin(object): @@ -12,7 +12,7 @@ def __init__(self): pass @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def hjort_parameters(data: np.ndarray): """ Jitted compute of Hjorth parameters for a given time series data. Hjorth parameters describe @@ -53,7 +53,7 @@ def diff(x): return activity, mobility, complexity @staticmethod - @njit("(float32[:], boolean)") + @njit('(float32[:], boolean)') def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray: """ Jitted compute of the local maxima or minima defined as values which are higher or lower than immediately preceding and proceeding time-series neighbors, repectively. @@ -99,7 +99,7 @@ def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray: return results[np.argwhere(results[:, 0].T != -1).flatten()] @staticmethod - @njit("(float32[:], float64)") + @njit('(float32[:], float64)') def crossings(data: np.ndarray, val: float) -> int: """ Jitted compute of the count in time-series where sequential values crosses a defined value. @@ -132,10 +132,8 @@ def crossings(data: np.ndarray, val: float) -> int: return cnt @staticmethod - @njit("(float32[:], int64, int64, )", cache=True, fastmath=True) - def percentile_difference( - data: np.ndarray, upper_pct: int, lower_pct: int - ) -> float: + @njit('(float32[:], int64, int64, )', cache=True, fastmath=True) + def percentile_difference(data: np.ndarray, upper_pct: int, lower_pct: int) -> float: """ Jitted compute of the difference between the ``upper`` and ``lower`` percentiles of the data as a percentage of the median value. @@ -159,13 +157,11 @@ def percentile_difference( """ - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( - data, lower_pct - ) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) return np.abs(upper_val - lower_val) / np.median(data) @staticmethod - @njit("(float32[:], float64,)", cache=True, fastmath=True) + @njit('(float32[:], float64,)', cache=True, fastmath=True) def percent_beyond_n_std(data: np.ndarray, n: float) -> float: """ Jitted compute of the ratio of values in time-series more than N standard deviations from the mean of the time-series. @@ -193,7 +189,7 @@ def percent_beyond_n_std(data: np.ndarray, n: float) -> float: return np.argwhere(np.abs(data) > target).shape[0] / data.shape[0] @staticmethod - @njit("(float32[:], int64, int64, )", cache=True, fastmath=True) + @njit('(float32[:], int64, int64, )', cache=True, fastmath=True) def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: int): """ Jitted compute of the ratio of values in time-series that fall between the ``upper`` and ``lower`` percentile. @@ -217,16 +213,11 @@ def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: in :align: center """ - upper_val, lower_val = np.percentile(data, upper_pct), np.percentile( - data, lower_pct - ) - return ( - np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0] - / data.shape[0] - ) + upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct) + return np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0] / data.shape[0] @staticmethod - @njit("(float32[:],)", fastmath=True, cache=True) + @njit('(float32[:],)', fastmath=True, cache=True) def petrosian_fractal_dimension(data: np.ndarray) -> float: """ Calculate the Petrosian Fractal Dimension (PFD) of a given time series data. The PFD is a measure of the @@ -270,13 +261,10 @@ def petrosian_fractal_dimension(data: np.ndarray) -> float: if zC == 0: return -1.0 - return np.log10(data.shape[0]) / ( - np.log10(data.shape[0]) - + np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC)) - ) + return np.log10(data.shape[0]) / (np.log10(data.shape[0]) + np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC))) @staticmethod - @njit("(float32[:], int64)") + @njit('(float32[:], int64)') def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): """ Jitted compute of the Higuchi Fractal Dimension of a given time series data. The Higuchi Fractal Dimension provides a measure of the fractal @@ -307,12 +295,8 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): """ L, N = np.zeros(kmax - 1), len(data) - x = np.hstack( - ( - -np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32), - np.ones(kmax - 1).reshape(-1, 1).astype(np.float32), - ) - ) + x = np.hstack((-np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32), + np.ones(kmax - 1).reshape(-1, 1).astype(np.float32))) for k in prange(2, kmax + 1): Lk = np.zeros(k) for m in range(0, k): @@ -326,6 +310,69 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): return np.linalg.lstsq(x, L.astype(np.float32))[0][0] + @staticmethod + @njit('(float32[:], int64, int64,)', fastmath=True) + def permutation_entropy(data: np.ndarray, dimension: int, delay: int) -> float: + + """ + Calculate the permutation entropy of a time series. + + Permutation entropy is a measure of the complexity of a time series data by quantifying + the irregularity and unpredictability of its order patterns. It is computed based on the + frequency of unique order patterns of a given dimension in the time series data. + + The permutation entropy (PE) is calculated using the following formula: + + .. math:: + PE = - \\sum(p_i \\log(p_i)) + + where: + - PE is the permutation entropy. + - p_i is the probability of each unique order pattern. + + :param numpy.ndarray data: The time series data for which permutation entropy is calculated. + + :param int dimension: It specifies the length of the order patterns to be considered. + :param int delay: Time delay between elements in an order pattern. + :return float: The permutation entropy of the time series, indicating its complexity and predictability. A higher permutation entropy value indicates higher complexity and unpredictability in the time series. + + :example: + >>> t = np.linspace(0, 50, int(44100 * 2.0), endpoint=False) + >>> sine_wave = 1.0 * np.sin(2 * np.pi * 1.0 * t).astype(np.float32) + >>> TimeseriesFeatureMixin().permutation_entropy(data=sine_wave, dimension=3, delay=1) + >>> 0.701970058666407 + >>> np.random.shuffle(sine_wave) + >>> TimeseriesFeatureMixin().permutation_entropy(data=sine_wave, dimension=3, delay=1) + >>> 1.79172449934604 + """ + + n, permutations, counts = len(data), List(), List() + for i in prange(n - (dimension - 1) * delay): + indices = np.arange(i, i + dimension * delay, delay) + permutation = List(np.argsort(data[indices])) + is_unique = True + for j in range(len(permutations)): + p = permutations[j] + if len(p) == len(permutation): + is_equal = True + for k in range(len(p)): + if p[k] != permutation[k]: + is_equal = False + break + if is_equal: + is_unique = False + counts[j] += 1 + break + if is_unique: + permutations.append(permutation) + counts.append(1) + + total_permutations = len(permutations) + probs = np.empty(total_permutations, dtype=types.float64) + for i in prange(total_permutations): + probs[i] = counts[i] / (n - (dimension - 1) * delay) + + return -np.sum(probs * np.log(probs)) # t = np.linspace(0, 50, int(44100 * 2.0), endpoint=False) # sine_wave = 1.0 * np.sin(2 * np.pi * 1.0 * t).astype(np.float32) @@ -333,4 +380,4 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10): # #1.0000398187022719 # np.random.shuffle(sine_wave) # TimeseriesFeatureMixin().petrosian_fractal_dimension(data=sine_wave) -# #1.0211625348743218 +# #1.0211625348743218 \ No newline at end of file