Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
sronilsson authored Oct 11, 2023
1 parent fce6323 commit 9e0aa21
Showing 1 changed file with 81 additions and 34 deletions.
115 changes: 81 additions & 34 deletions simba/mixins/timeseries_features_mixin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from numba import njit, prange, types
from numba.typed import List
import numpy as np
from numba import njit, prange


class TimeseriesFeatureMixin(object):

Expand All @@ -12,7 +12,7 @@ def __init__(self):
pass

@staticmethod
@njit("(float32[:],)")
@njit('(float32[:],)')
def hjort_parameters(data: np.ndarray):
"""
Jitted compute of Hjorth parameters for a given time series data. Hjorth parameters describe
Expand Down Expand Up @@ -53,7 +53,7 @@ def diff(x):
return activity, mobility, complexity

@staticmethod
@njit("(float32[:], boolean)")
@njit('(float32[:], boolean)')
def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray:
"""
Jitted compute of the local maxima or minima defined as values which are higher or lower than immediately preceding and proceeding time-series neighbors, repectively.
Expand Down Expand Up @@ -99,7 +99,7 @@ def local_maxima_minima(data: np.ndarray, maxima: bool) -> np.ndarray:
return results[np.argwhere(results[:, 0].T != -1).flatten()]

@staticmethod
@njit("(float32[:], float64)")
@njit('(float32[:], float64)')
def crossings(data: np.ndarray, val: float) -> int:
"""
Jitted compute of the count in time-series where sequential values crosses a defined value.
Expand Down Expand Up @@ -132,10 +132,8 @@ def crossings(data: np.ndarray, val: float) -> int:
return cnt

@staticmethod
@njit("(float32[:], int64, int64, )", cache=True, fastmath=True)
def percentile_difference(
data: np.ndarray, upper_pct: int, lower_pct: int
) -> float:
@njit('(float32[:], int64, int64, )', cache=True, fastmath=True)
def percentile_difference(data: np.ndarray, upper_pct: int, lower_pct: int) -> float:
"""
Jitted compute of the difference between the ``upper`` and ``lower`` percentiles of the data as
a percentage of the median value.
Expand All @@ -159,13 +157,11 @@ def percentile_difference(
"""

upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(
data, lower_pct
)
upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct)
return np.abs(upper_val - lower_val) / np.median(data)

@staticmethod
@njit("(float32[:], float64,)", cache=True, fastmath=True)
@njit('(float32[:], float64,)', cache=True, fastmath=True)
def percent_beyond_n_std(data: np.ndarray, n: float) -> float:
"""
Jitted compute of the ratio of values in time-series more than N standard deviations from the mean of the time-series.
Expand Down Expand Up @@ -193,7 +189,7 @@ def percent_beyond_n_std(data: np.ndarray, n: float) -> float:
return np.argwhere(np.abs(data) > target).shape[0] / data.shape[0]

@staticmethod
@njit("(float32[:], int64, int64, )", cache=True, fastmath=True)
@njit('(float32[:], int64, int64, )', cache=True, fastmath=True)
def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: int):
"""
Jitted compute of the ratio of values in time-series that fall between the ``upper`` and ``lower`` percentile.
Expand All @@ -217,16 +213,11 @@ def percent_in_percentile_window(data: np.ndarray, upper_pct: int, lower_pct: in
:align: center
"""

upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(
data, lower_pct
)
return (
np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0]
/ data.shape[0]
)
upper_val, lower_val = np.percentile(data, upper_pct), np.percentile(data, lower_pct)
return np.argwhere((data <= upper_val) & (data >= lower_val)).flatten().shape[0] / data.shape[0]

@staticmethod
@njit("(float32[:],)", fastmath=True, cache=True)
@njit('(float32[:],)', fastmath=True, cache=True)
def petrosian_fractal_dimension(data: np.ndarray) -> float:
"""
Calculate the Petrosian Fractal Dimension (PFD) of a given time series data. The PFD is a measure of the
Expand Down Expand Up @@ -270,13 +261,10 @@ def petrosian_fractal_dimension(data: np.ndarray) -> float:
if zC == 0:
return -1.0

return np.log10(data.shape[0]) / (
np.log10(data.shape[0])
+ np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC))
)
return np.log10(data.shape[0]) / (np.log10(data.shape[0]) + np.log10(data.shape[0] / (data.shape[0] + 0.4 * zC)))

@staticmethod
@njit("(float32[:], int64)")
@njit('(float32[:], int64)')
def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10):
"""
Jitted compute of the Higuchi Fractal Dimension of a given time series data. The Higuchi Fractal Dimension provides a measure of the fractal
Expand Down Expand Up @@ -307,12 +295,8 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10):
"""

L, N = np.zeros(kmax - 1), len(data)
x = np.hstack(
(
-np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32),
np.ones(kmax - 1).reshape(-1, 1).astype(np.float32),
)
)
x = np.hstack((-np.log(np.arange(2, kmax + 1)).reshape(-1, 1).astype(np.float32),
np.ones(kmax - 1).reshape(-1, 1).astype(np.float32)))
for k in prange(2, kmax + 1):
Lk = np.zeros(k)
for m in range(0, k):
Expand All @@ -326,11 +310,74 @@ def higuchi_fractal_dimension(data: np.ndarray, kmax: int = 10):

return np.linalg.lstsq(x, L.astype(np.float32))[0][0]

@staticmethod
@njit('(float32[:], int64, int64,)', fastmath=True)
def permutation_entropy(data: np.ndarray, dimension: int, delay: int) -> float:

"""
Calculate the permutation entropy of a time series.
Permutation entropy is a measure of the complexity of a time series data by quantifying
the irregularity and unpredictability of its order patterns. It is computed based on the
frequency of unique order patterns of a given dimension in the time series data.
The permutation entropy (PE) is calculated using the following formula:
.. math::
PE = - \\sum(p_i \\log(p_i))
where:
- PE is the permutation entropy.
- p_i is the probability of each unique order pattern.
:param numpy.ndarray data: The time series data for which permutation entropy is calculated.
:param int dimension: It specifies the length of the order patterns to be considered.
:param int delay: Time delay between elements in an order pattern.
:return float: The permutation entropy of the time series, indicating its complexity and predictability. A higher permutation entropy value indicates higher complexity and unpredictability in the time series.
:example:
>>> t = np.linspace(0, 50, int(44100 * 2.0), endpoint=False)
>>> sine_wave = 1.0 * np.sin(2 * np.pi * 1.0 * t).astype(np.float32)
>>> TimeseriesFeatureMixin().permutation_entropy(data=sine_wave, dimension=3, delay=1)
>>> 0.701970058666407
>>> np.random.shuffle(sine_wave)
>>> TimeseriesFeatureMixin().permutation_entropy(data=sine_wave, dimension=3, delay=1)
>>> 1.79172449934604
"""

n, permutations, counts = len(data), List(), List()
for i in prange(n - (dimension - 1) * delay):
indices = np.arange(i, i + dimension * delay, delay)
permutation = List(np.argsort(data[indices]))
is_unique = True
for j in range(len(permutations)):
p = permutations[j]
if len(p) == len(permutation):
is_equal = True
for k in range(len(p)):
if p[k] != permutation[k]:
is_equal = False
break
if is_equal:
is_unique = False
counts[j] += 1
break
if is_unique:
permutations.append(permutation)
counts.append(1)

total_permutations = len(permutations)
probs = np.empty(total_permutations, dtype=types.float64)
for i in prange(total_permutations):
probs[i] = counts[i] / (n - (dimension - 1) * delay)

return -np.sum(probs * np.log(probs))

# t = np.linspace(0, 50, int(44100 * 2.0), endpoint=False)
# sine_wave = 1.0 * np.sin(2 * np.pi * 1.0 * t).astype(np.float32)
# TimeseriesFeatureMixin().petrosian_fractal_dimension(data=sine_wave)
# #1.0000398187022719
# np.random.shuffle(sine_wave)
# TimeseriesFeatureMixin().petrosian_fractal_dimension(data=sine_wave)
# #1.0211625348743218
# #1.0211625348743218

0 comments on commit 9e0aa21

Please sign in to comment.