diff --git a/simba/mixins/circular_statistics.py b/simba/mixins/circular_statistics.py index 4121c79ce..9bf58a959 100644 --- a/simba/mixins/circular_statistics.py +++ b/simba/mixins/circular_statistics.py @@ -1,14 +1,11 @@ __author__ = "Simon Nilsson" import time -from typing import List, Tuple - import numpy as np -from numba import jit, njit, prange, typed - +from numba import jit, prange, typed, njit +from typing import List, Tuple from simba.utils.data import fast_mean_rank - class CircularStatisticsMixin(object): """ @@ -20,7 +17,9 @@ class CircularStatisticsMixin(object): These methods have support for multiple animals and base radial directions derived from two or three body-parts. Methods are adopted from the referenced packages below which are **far** more reliable. However, - runtime is prioritized and typically multiple orders of magnitude faster than referenced libraries. + runtime is prioritized and typically multiple orders of magnitude faster than referenced libraries. + + See image below for example of expected run-times for an example of the methods included in this class. .. note:: Many method has numba typed `signatures `_ to decrease @@ -33,6 +32,10 @@ class CircularStatisticsMixin(object): :width: 800 :align: center + .. image:: _static/img/circular_stats_runtimes.png + :width: 1200 + :align: center + References ---------- .. [1] `pycircstat `_. @@ -46,9 +49,11 @@ class CircularStatisticsMixin(object): def __init__(self): pass + @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def mean_resultant_vector_length(data: np.ndarray) -> float: + """ Jitted compute of the mean resultant vector length of a single sample. Captures the overall "pull" or "tendency" of the data points towards a central direction on the circle with a range between 0 and 1. @@ -64,16 +69,14 @@ def mean_resultant_vector_length(data: np.ndarray) -> float: data = np.deg2rad(data) mean_angles = np.arctan2(np.mean(np.sin(data)), np.mean(np.cos(data))) - return np.sqrt( - np.sum(np.cos(data - mean_angles)) ** 2 - + np.sum(np.sin(data - mean_angles)) ** 2 - ) / len(data) + return np.sqrt(np.sum(np.cos(data - mean_angles)) ** 2 + np.sum(np.sin(data - mean_angles)) ** 2) / len(data) @staticmethod - @njit("(float32[:], float64, float64[:])") - def sliding_mean_resultant_vector_length( - data: np.ndarray, fps: int, time_windows: np.ndarray - ) -> np.ndarray: + @njit('(float32[:], float64, float64[:])') + def sliding_mean_resultant_vector_length(data: np.ndarray, + fps: int, + time_windows: np.ndarray) -> np.ndarray: + """ Jitted compute of the mean resultant vector within sliding time window. Captures the overall "pull" or "tendency" of the data points towards a central direction on the circle with a range between 0 and 1. @@ -102,22 +105,17 @@ def sliding_mean_resultant_vector_length( results = np.full((data.shape[0], time_windows.shape[0]), -1.0) for time_window_cnt in prange(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) - for window_end in prange(window_size, data.shape[0] + 1, 1): - window_data = data[window_end - window_size : window_end] - print(window_data) - mean_angles = np.arctan2( - np.mean(np.sin(window_data)), np.mean(np.cos(window_data)) - ) - r = np.sqrt( - np.sum(np.cos(window_data - mean_angles)) ** 2 - + np.sum(np.sin(window_data - mean_angles)) ** 2 - ) / len(window_data) - results[window_end - 1] = r + for window_end in prange(window_size, data.shape[0]+1, 1): + window_data = data[window_end - window_size:window_end] + mean_angles = np.arctan2(np.mean(np.sin(window_data)), np.mean(np.cos(window_data))) + r = np.sqrt(np.sum(np.cos(window_data - mean_angles)) ** 2 + np.sum(np.sin(window_data - mean_angles)) ** 2) / len(window_data) + results[window_end-1] = r return results @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def circular_mean(data: np.ndarray) -> float: + """ Jitted compute of the circular mean of single sample. @@ -134,17 +132,14 @@ def circular_mean(data: np.ndarray) -> float: >>> 63.737892150878906 """ - return np.rad2deg( - np.arctan2( - np.mean(np.sin(np.deg2rad(data))), np.mean(np.cos(np.deg2rad(data))) - ) - ) + return np.rad2deg(np.arctan2(np.mean(np.sin(np.deg2rad(data))), np.mean(np.cos(np.deg2rad(data))))) @staticmethod - @njit("(float32[:], float64[:], float64)") - def sliding_circular_mean( - data: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + @njit('(float32[:], float64[:], float64)') + def sliding_circular_mean(data: np.ndarray, + time_windows: np.ndarray, + fps: int) -> np.ndarray: + """ Compute the circular mean in degrees within sliding temporal windows. @@ -171,16 +166,12 @@ def sliding_circular_mean( for time_window in prange(time_windows.shape[0]): window_size = int(time_windows[time_window] * fps) for current_frm in prange(window_size, results.shape[0]): - data_window = np.deg2rad(data[current_frm - window_size : current_frm]) - results[current_frm, time_window] = np.rad2deg( - np.arctan2( - np.mean(np.sin(data_window)), np.mean(np.cos(data_window)) - ) - ) + data_window = np.deg2rad(data[current_frm - window_size:current_frm]) + results[current_frm, time_window] = np.rad2deg(np.arctan2(np.mean(np.sin(data_window)), np.mean(np.cos(data_window)))) return results @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def circular_std(data: np.ndarray) -> float: """ Jitted compute of the circular standard deviation from a single distribution of angles in degrees. @@ -201,11 +192,13 @@ def circular_std(data: np.ndarray) -> float: data = np.deg2rad(data) return np.rad2deg(np.sqrt(-2 * np.log(np.abs(np.mean(np.exp(1j * data)))))) + @staticmethod - @njit("(float32[:], int64, float64[:])") - def sliding_circular_std( - data: np.ndarray, fps: int, time_windows: np.ndarray - ) -> np.ndarray: + @njit('(float32[:], int64, float64[:])') + def sliding_circular_std(data: np.ndarray, + fps: int, + time_windows: np.ndarray) -> np.ndarray: + """ Compute standard deviation of angular data in sliding time windows. @@ -227,16 +220,16 @@ def sliding_circular_std( results = np.full((data.shape[0], time_windows.shape[0]), 0.0) for time_window_cnt in prange(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) - for window_end in prange(window_size, data.shape[0] + 1, 1): - window_data = data[window_end - window_size : window_end] - results[window_end - 1][time_window_cnt] = np.rad2deg( - np.sqrt(-2 * np.log(np.abs(np.mean(np.exp(1j * window_data))))) - ) + for window_end in prange(window_size, data.shape[0]+1, 1): + window_data = data[window_end - window_size:window_end] + results[window_end-1][time_window_cnt] = np.rad2deg(np.sqrt(-2 * np.log(np.abs(np.mean(np.exp(1j * window_data)))))) return results @staticmethod - @njit("(float32[:], int64)") - def instantaneous_angular_velocity(data: np.ndarray, bin_size: int): + @njit('(float32[:], int64)') + def instantaneous_angular_velocity(data: np.ndarray, + bin_size: int): + """ Jitted compute of absolute angular change in the smallest possible time bin. @@ -267,14 +260,12 @@ def instantaneous_angular_velocity(data: np.ndarray, bin_size: int): results = np.full((data.shape[0]), -1.0) left_idx, right_idx = 0, bin_size for end_idx in prange(right_idx, data.shape[0] + 1, 1): - results[end_idx] = np.rad2deg( - np.pi - np.abs(np.pi - np.abs(data[left_idx] - data[end_idx])) - ) + results[end_idx] = np.rad2deg(np.pi - np.abs(np.pi - np.abs(data[left_idx] - data[end_idx]))) left_idx += 1 return results @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def degrees_to_cardinal(degree_angles: np.ndarray) -> List[str]: """ Convert degree angles to cardinal direction bucket e.g., 0 -> "N", 180 -> "S" @@ -296,19 +287,20 @@ def degrees_to_cardinal(degree_angles: np.ndarray) -> List[str]: >>> ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW', 'N'] """ - DIRECTIONS = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] - results = typed.List(["str"]) + DIRECTIONS = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW'] + results = typed.List(['str']) for i in prange(degree_angles.shape[0]): - ix = round(degree_angles[i] / (360.0 / len(DIRECTIONS))) + ix = round(degree_angles[i] / (360. / len(DIRECTIONS))) direction = DIRECTIONS[ix % len(DIRECTIONS)] results.append(direction) return results[1:] @staticmethod - @njit("(float32[:,:], float32[:, :], float32[:, :])") - def direction_three_bps( - nose_loc: np.ndarray, left_ear_loc: np.ndarray, right_ear_loc: np.ndarray - ) -> np.ndarray: + @njit('(float32[:,:], float32[:, :], float32[:, :])') + def direction_three_bps(nose_loc: np.ndarray, + left_ear_loc: np.ndarray, + right_ear_loc: np.ndarray) -> np.ndarray: + """ Jitted helper to compute the degree angle from three body-parts. Computes the angle in degrees left_ear <-> nose and right_ear_nose and returns the midpoint. @@ -332,25 +324,18 @@ def direction_three_bps( results = np.full((nose_loc.shape[0]), np.nan) for i in prange(nose_loc.shape[0]): left_ear_to_nose = np.degrees( - np.arctan2( - left_ear_loc[i][0] - nose_loc[i][1], - left_ear_loc[i][1] - nose_loc[i][0], - ) - ) + np.arctan2(left_ear_loc[i][0] - nose_loc[i][1], left_ear_loc[i][1] - nose_loc[i][0])) right_ear_nose = np.degrees( - np.arctan2( - right_ear_loc[i][0] - nose_loc[i][1], - right_ear_loc[i][1] - nose_loc[i][0], - ) - ) + np.arctan2(right_ear_loc[i][0] - nose_loc[i][1], right_ear_loc[i][1] - nose_loc[i][0])) results[i] = ((left_ear_to_nose + right_ear_nose) % 360) / 2 return results + @staticmethod - @njit("(float32[:, :], float32[:, :])") - def direction_two_bps( - swim_bladder_loc: np.ndarray, tail_loc: np.ndarray - ) -> np.ndarray: + @njit('(float32[:, :], float32[:, :])') + def direction_two_bps(swim_bladder_loc: np.ndarray, + tail_loc: np.ndarray) -> np.ndarray: + """ Jitted method computing degree directionality from two body-parts. E.g., ``nape`` and ``nose``, or ``swim_bladder`` and ``tail``. @@ -371,18 +356,13 @@ def direction_two_bps( results = np.full((swim_bladder_loc.shape[0]), np.nan) for i in prange(swim_bladder_loc.shape[0]): - angle_degrees = np.degrees( - np.arctan2( - swim_bladder_loc[i][0] - tail_loc[i][0], - tail_loc[i][1] - swim_bladder_loc[i][1], - ) - ) + angle_degrees = np.degrees(np.arctan2(swim_bladder_loc[i][0] - tail_loc[i][0], tail_loc[i][1] - swim_bladder_loc[i][1])) angle_degrees = angle_degrees + 360 if angle_degrees < 0 else angle_degrees results[i] = angle_degrees return results @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def rayleigh(data: np.ndarray) -> Tuple[float, float]: """ Jitted compute of Rayleigh Z (test of non-uniformity) of single sample of circular data in degrees. @@ -397,17 +377,14 @@ def rayleigh(data: np.ndarray) -> Tuple[float, float]: data = np.deg2rad(data) R = np.sqrt(np.sum(np.cos(data)) ** 2 + np.sum(np.sin(data)) ** 2) / len(data) - p = np.exp( - np.sqrt(1 + 4 * len(data) + 4 * (len(data) ** 2 - R**2)) - - (1 + 2 * len(data)) - ) - return len(data) * R**2, p + p = np.exp(np.sqrt(1 + 4 * len(data) + 4 * (len(data) ** 2 - R ** 2)) - (1 + 2 * len(data))) + return len(data) * R ** 2, p @staticmethod - @njit("(float32[:], float64[:], float64)") - def rolling_rayleigh_z( - data: np.ndarray, time_windows: np.ndarray, fps: int - ) -> Tuple[np.ndarray, np.ndarray]: + @njit('(float32[:], float64[:], float64)') + def rolling_rayleigh_z(data: np.ndarray, + time_windows: np.ndarray, + fps: int) -> Tuple[np.ndarray, np.ndarray]: """ Jitted compute of Rayleigh Z (test of non-uniformity) of circular data within sliding time-window. @@ -425,26 +402,21 @@ def rolling_rayleigh_z( """ data = np.deg2rad(data) - Z_results, P_results = np.full( - (data.shape[0], time_windows.shape[0]), 0.0 - ), np.full((data.shape[0], time_windows.shape[0]), 0.0) + Z_results, P_results = np.full((data.shape[0], time_windows.shape[0]), 0.0), np.full((data.shape[0], time_windows.shape[0]), 0.0) for i in prange(time_windows.shape[0]): win_size = int(time_windows[i] * fps) - for j in prange(win_size, len(data) + 1): - data_win = data[j - win_size : j] - R = np.sqrt( - np.sum(np.cos(data_win)) ** 2 + np.sum(np.sin(data_win)) ** 2 - ) / len(data_win) - Z_results[j - 1][i] = len(data_win) * R**2 - P_results[j - 1][i] = np.exp( - np.sqrt(1 + 4 * len(data_win) + 4 * (len(data_win) ** 2 - R**2)) - - (1 + 2 * len(data_win)) - ) + for j in prange(win_size, len(data)+1): + data_win = data[j-win_size: j] + R = np.sqrt(np.sum(np.cos(data_win)) ** 2 + np.sum(np.sin(data_win)) ** 2) / len(data_win) + Z_results[j-1][i] = len(data_win) * R ** 2 + P_results[j-1][i] = np.exp(np.sqrt(1 + 4 * len(data_win) + 4 * (len(data_win) ** 2 - R ** 2)) - (1 + 2 * len(data_win))) return Z_results, P_results @staticmethod - @njit("(float32[:], float32[:],)") - def circular_correlation(sample_1: np.ndarray, sample_2: np.ndarray) -> float: + @njit('(float32[:], float32[:],)') + def circular_correlation(sample_1: np.ndarray, + sample_2: np.ndarray) -> float: + """ Jitted compute of circular correlation coefficient of two samples using the cross-correlation coefficient. Ranges from -1 to 1: 1 indicates perfect positive correlation, -1 indicates perfect negative correlation, 0 @@ -472,15 +444,15 @@ def circular_correlation(sample_1: np.ndarray, sample_2: np.ndarray) -> float: m1 = np.arctan2(np.mean(np.sin(sample_1)), np.mean(np.cos(sample_1))) m2 = np.arctan2(np.mean(np.sin(sample_2)), np.mean(np.cos(sample_2))) sin_1, sin_2 = np.sin(sample_1 - m1), np.sin(sample_2 - m2) - return np.sum(sin_1 * sin_2) / np.sqrt( - np.sum(sin_1 * sin_1) * np.sum(sin_2 * sin_2) - ) + return np.sum(sin_1 * sin_2) / np.sqrt(np.sum(sin_1 * sin_1) * np.sum(sin_2 * sin_2)) @staticmethod - @njit("(float32[:], float32[:], float64[:], int64)") - def sliding_circular_correlation( - sample_1: np.ndarray, sample_2: np.ndarray, time_windows: np.ndarray, fps: float - ) -> np.ndarray: + @njit('(float32[:], float32[:], float64[:], int64)') + def sliding_circular_correlation(sample_1: np.ndarray, + sample_2: np.ndarray, + time_windows: np.ndarray, + fps: float) -> np.ndarray: + """ Jitted compute of correlations between two angular distributions in sliding time-windows using the cross-correlation coefficient. @@ -506,28 +478,23 @@ def sliding_circular_correlation( results = np.full((sample_1.shape[0], time_windows.shape[0]), np.nan) for i in prange(time_windows.shape[0]): win_size = int(time_windows[i] * fps) - for j in prange(win_size, sample_1.shape[0] + 1): - data_1_window = sample_1[j - win_size : j] - data_2_window = sample_2[j - win_size : j] - m1 = np.arctan2( - np.mean(np.sin(data_1_window)), np.mean(np.cos(data_1_window)) - ) - m2 = np.arctan2( - np.mean(np.sin(data_2_window)), np.mean(np.cos(data_2_window)) - ) + for j in prange(win_size, sample_1.shape[0]+1): + data_1_window = sample_1[j-win_size:j] + data_2_window = sample_2[j-win_size:j] + m1 = np.arctan2(np.mean(np.sin(data_1_window)), np.mean(np.cos(data_1_window))) + m2 = np.arctan2(np.mean(np.sin(data_2_window)), np.mean(np.cos(data_2_window))) sin_1, sin_2 = np.sin(data_1_window - m1), np.sin(data_2_window - m2) - c = np.sum(sin_1 * sin_2) / np.sqrt( - np.sum(sin_1 * sin_1) * np.sum(sin_2 * sin_2) - ) + c = np.sum(sin_1 * sin_2) / np.sqrt(np.sum(sin_1 * sin_1) * np.sum(sin_2 * sin_2)) results[j][i] = c return results @staticmethod - @njit("(float32[:], float64[:], int64)") - def sliding_angular_diff( - data: np.ndarray, time_windows: np.ndarray, fps: float - ) -> np.ndarray: + @njit('(float32[:], float64[:], int64)') + def sliding_angular_diff(data: np.ndarray, + time_windows: np.ndarray, + fps: float) -> np.ndarray: + """ Computes the angular difference in the current frame versus N seconds previously. For example, if the current angle is 45 degrees, and the angle N seconds previously was 350 degrees, then the difference @@ -554,22 +521,19 @@ def sliding_angular_diff( results = np.full((data_rad.shape[0], time_windows.shape[0]), 0.0) for time_window_cnt in prange(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) - for left, right in zip( - prange(0, (data_rad.shape[0] - window_size) + 1), - prange(window_size - 1, data_rad.shape[0] + 1), - ): - distance = np.pi - np.abs( - np.pi - np.abs(data_rad[left] - data_rad[right]) - ) + for left, right in zip(prange(0, (data_rad.shape[0]-window_size)+ 1), prange(window_size-1, data_rad.shape[0] + 1)): + distance = np.pi - np.abs(np.pi - np.abs(data_rad[left] - data_rad[right])) results[right][time_window_cnt] = np.rad2deg(distance) return results + @staticmethod - @njit("(float32[:], float64[:], int64)") - def agg_angular_diff_timebins( - data: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + @njit('(float32[:], float64[:], int64)') + def agg_angular_diff_timebins(data: np.ndarray, + time_windows: np.ndarray, + fps: int) -> np.ndarray: + """ Compute the difference between the median angle in the current time-window versus the previous time window. For example, computes the difference between the mean angle in the first 1s of the video versus @@ -596,33 +560,26 @@ def agg_angular_diff_timebins( for time_window_cnt in prange(time_windows.shape[0]): window_size = int(time_windows[time_window_cnt] * fps) prior_window = [0, window_size] - for win_cnt, window_end in enumerate( - prange(int(window_size * 2), data.shape[0] + 1, window_size) - ): + for win_cnt, window_end in enumerate(prange(int(window_size * 2), data.shape[0] + 1, window_size)): window_start = (window_end - window_size) - 1 current_data = data[window_start:window_end] - prior_data = data[prior_window[0] : prior_window[1]] - prior_median = np.arctan2( - np.sum(np.sin(prior_data)), np.sum(np.cos(prior_data)) - ) - if prior_median < 0: - prior_median += 2 * np.pi - current_median = np.arctan2( - np.sum(np.sin(current_data)), np.sum(np.cos(current_data)) - ) - if current_median < 0: - current_median += 2 * np.pi + prior_data = data[prior_window[0]: prior_window[1]] + prior_median = np.arctan2(np.sum(np.sin(prior_data)), np.sum(np.cos(prior_data))) + if prior_median < 0: prior_median += 2 * np.pi + current_median = np.arctan2(np.sum(np.sin(current_data)), np.sum(np.cos(current_data))) + if current_median < 0: current_median += 2 * np.pi distance = np.pi - np.abs(np.pi - np.abs(prior_median - current_median)) results[window_start:window_end, win_cnt] = np.rad2deg(distance) prior_window = [window_start, window_end] return results + @staticmethod - @njit("(float32[:], float64[:], int64)") - def sliding_rao_spacing( - data: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + @njit('(float32[:], float64[:], int64)') + def sliding_rao_spacing(data: np.ndarray, + time_windows: np.ndarray, + fps: int) -> np.ndarray: """ Jitted compute of the uniformity of a circular dataset in sliding windows. @@ -647,25 +604,12 @@ def sliding_rao_spacing( for win_cnt in prange(time_windows.shape[0]): window_size = int(time_windows[win_cnt] * fps) for i in range(window_size, data.shape[0]): - w_data = np.sort(data[i - window_size : i]) - Ti, TiL = np.full((w_data.shape[0]), np.nan), np.full( - (w_data.shape[0]), np.nan - ) + w_data = np.sort(data[i-window_size: i]) + Ti, TiL = np.full((w_data.shape[0]), np.nan), np.full((w_data.shape[0]), np.nan) l = np.int8(360 / len(w_data)) - Ti[-1] = np.rad2deg( - np.pi - - np.abs( - np.pi - np.abs(np.deg2rad(w_data[0]) - np.deg2rad(w_data[-1])) - ) - ) - for j in prange(w_data.shape[0] - 1, -1, -1): - Ti[j] = np.rad2deg( - np.pi - - np.abs( - np.pi - - np.abs(np.deg2rad(w_data[j]) - np.deg2rad(w_data[j - 1])) - ) - ) + Ti[-1] = np.rad2deg(np.pi - np.abs(np.pi - np.abs(np.deg2rad(w_data[0]) - np.deg2rad(w_data[-1])))) + for j in prange(w_data.shape[0]-1, -1, -1): + Ti[j] = np.rad2deg(np.pi - np.abs(np.pi - np.abs(np.deg2rad(w_data[j]) - np.deg2rad(w_data[j-1])))) for k in prange(Ti.shape[0]): TiL[int(k)] = max((l, Ti[k])) - min((l, Ti[k])) S = np.sum(TiL) @@ -674,8 +618,10 @@ def sliding_rao_spacing( return results @staticmethod - @njit("(float32[:], float32[:])") - def kuipers_two_sample_test(sample_1: np.ndarray, sample_2: np.ndarray) -> float: + @njit('(float32[:], float32[:])') + def kuipers_two_sample_test(sample_1: np.ndarray, + sample_2: np.ndarray) -> float: + """ .. note:: Adapted from `Kuiper `__ by `Anne Archibald `_. @@ -685,20 +631,20 @@ def kuipers_two_sample_test(sample_1: np.ndarray, sample_2: np.ndarray) -> float >>> CircularStatisticsMixin().kuipers_two_sample_test(sample_1=sample_1, sample_2=sample_2) """ - sample_1, sample_2 = np.deg2rad(np.sort(sample_1)), np.deg2rad( - np.sort(sample_2) - ) + sample_1, sample_2 = np.deg2rad(np.sort(sample_1)), np.deg2rad(np.sort(sample_2)) cdfv1 = np.searchsorted(sample_2, sample_1) / float(len(sample_2)) cdfv2 = np.searchsorted(sample_1, sample_2) / float(len(sample_1)) - return np.amax( - cdfv1 - np.arange(len(sample_1)) / float(len(sample_1)) - ) + np.amax(cdfv2 - np.arange(len(sample_2)) / float(len(sample_2))) + return (np.amax(cdfv1 - np.arange(len(sample_1)) / float(len(sample_1))) + + np.amax(cdfv2 - np.arange(len(sample_2)) / float(len(sample_2)))) + @staticmethod - @njit("(float32[:], float32[:], float64[:], int64)") - def sliding_kuipers_two_sample_test( - sample_1: np.ndarray, sample_2: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + @njit('(float32[:], float32[:], float64[:], int64)') + def sliding_kuipers_two_sample_test(sample_1: np.ndarray, + sample_2: np.ndarray, + time_windows: np.ndarray, + fps: int) -> np.ndarray: + """ Jitted compute of Kuipers two-sample test comparing two distributions with sliding time window. @@ -711,33 +657,25 @@ def sliding_kuipers_two_sample_test( for time_window_cnt in prange(time_windows.shape[0]): win_size = int(time_windows[time_window_cnt] * fps) for i in range(win_size, sample_1.shape[0]): - sample_1_win, sample_2_win = ( - sample_1[i - win_size : i], - sample_2[i - win_size : i], - ) - cdfv1 = np.searchsorted(sample_2, sample_1_win) / float( - len(sample_2_win) - ) - cdfv2 = np.searchsorted(sample_1_win, sample_2_win) / float( - len(sample_1_win) - ) - D = np.amax( - cdfv1 - np.arange(len(sample_1_win)) / float(len(sample_1_win)) - ) + np.amax( - cdfv2 - np.arange(len(sample_2_win)) / float(len(sample_2_win)) - ) + sample_1_win, sample_2_win = sample_1[i-win_size:i], sample_2[i-win_size:i] + cdfv1 = np.searchsorted(sample_2, sample_1_win) / float(len(sample_2_win)) + cdfv2 = np.searchsorted(sample_1_win, sample_2_win) / float(len(sample_1_win)) + D = (np.amax(cdfv1 - np.arange(len(sample_1_win)) / float(len(sample_1_win))) + + np.amax(cdfv2 - np.arange(len(sample_2_win)) / float(len(sample_2_win)))) results[i][time_window_cnt] = D return results + @staticmethod - def sliding_hodges_ajne( - data: np.ndarray, time_window: float, fps: int - ) -> np.ndarray: + def sliding_hodges_ajne(data: np.ndarray, + time_window: float, + fps: int) -> np.ndarray: + data = np.deg2rad(data) results, window_size = np.full((data.shape[0]), -1.0), int(time_window * fps) for i in range(window_size, data.shape[0]): - w_data = data[i - window_size : i] + w_data = data[i-window_size: i] v = 1 - np.abs(np.mean(np.exp(1j * w_data))) n = len(w_data) H = n * (1 - v) @@ -753,13 +691,13 @@ def hodges_ajne(sample: np.ndarray): @staticmethod @jit(nopython=True) - def watson_williams_test(sample_1: np.ndarray, sample_2: np.ndarray): + def watson_williams_test(sample_1: np.ndarray, + sample_2: np.ndarray): + variance1 = 1 - np.abs(np.mean(np.exp(1j * sample_1))) variance2 = 1 - np.abs(np.mean(np.exp(1j * sample_2))) numerator = (variance1 + variance2) / 2 - denominator = (variance1**2 / len(sample_1)) + ( - variance2**2 / len(sample_2) - ) + denominator = (variance1 ** 2 / len(sample_1)) + (variance2 ** 2 / len(sample_2)) F = numerator / denominator return F @@ -771,7 +709,7 @@ def watsons_u(data: np.ndarray): return n * (1 - np.abs(mean_vector)) @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def circular_range(data: np.ndarray) -> float: """ Jitted compute of circular range in degrees. The range is defined as the angular span of the @@ -798,10 +736,11 @@ def circular_range(data: np.ndarray) -> float: return np.ceil(np.rad2deg(min(2 * np.pi - circular_range, data[-1] - data[0]))) @staticmethod - @njit("(float32[:], float64[:], int64)") - def sliding_circular_range( - data: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: + @njit('(float32[:], float64[:], int64)') + def sliding_circular_range(data: np.ndarray, + time_windows: np.ndarray, + fps: int) -> np.ndarray: + """ Jitted compute of sliding circular range for a time series of circular data. The range is defined as the angular span of the shortest arc that can contain all the data points. Measures the circular spread of data within sliding time windows of specified duration. @@ -824,20 +763,15 @@ def sliding_circular_range( results = np.full((data.shape[0], time_windows.shape[0]), -1.0) for time_window_cnt in prange(time_windows.shape[0]): win_size = int(time_windows[time_window_cnt] * fps) - for left, right in zip( - prange(0, (data.shape[0] - win_size) + 1), - prange(win_size - 1, data.shape[0] + 1), - ): - sample = np.sort(np.deg2rad(data[left : right + 1])) + for left, right in zip(prange(0, (data.shape[0] - win_size) + 1), prange(win_size - 1, data.shape[0] + 1)): + sample = np.sort(np.deg2rad(data[left:right + 1])) angular_diffs = np.diff(sample) circular_range = np.max(angular_diffs) - results[right][time_window_cnt] = np.rad2deg( - min(2 * np.pi - circular_range, sample[-1] - sample[0]) - ) + results[right][time_window_cnt] = np.rad2deg(min(2 * np.pi - circular_range, sample[-1] - sample[0])) return results @staticmethod - @njit("(float32[:], int64[:, :],)") + @njit('(float32[:], int64[:, :],)') def circular_hotspots(data: np.ndarray, bins: np.ndarray) -> np.ndarray: """ Calculate the proportion of data points falling within circular bins. @@ -862,19 +796,18 @@ def circular_hotspots(data: np.ndarray, bins: np.ndarray) -> np.ndarray: results = np.full((bins.shape[0]), -1.0) for bin_cnt in range(bins.shape[0]): if bins[bin_cnt][0] > bins[bin_cnt][1]: - mask = ((data >= bins[bin_cnt][0]) & (data <= 360)) | ( - (data >= 0) & (data <= bins[bin_cnt][1]) - ) + mask = ((data >= bins[bin_cnt][0]) & (data <= 360)) | ((data >= 0) & (data <= bins[bin_cnt][1])) else: mask = (data >= bins[bin_cnt][0]) & (data <= bins[bin_cnt][1]) results[bin_cnt] = data[mask].shape[0] / data.shape[0] return results @staticmethod - @njit("(float32[:], int64[:, :], float64, float64)") - def sliding_circular_hotspots( - data: np.ndarray, bins: np.ndarray, time_window: float, fps: float - ) -> np.ndarray: + @njit('(float32[:], int64[:, :], float64, float64)') + def sliding_circular_hotspots(data: np.ndarray, + bins: np.ndarray, + time_window: float, + fps: float) -> np.ndarray: """ Jitted compute of sliding circular hotspots in a dataset. Calculates circular hotspots in a time-series dataset by sliding a time window across the data and computing hotspot statistics for specified circular bins. @@ -917,23 +850,19 @@ def sliding_circular_hotspots( """ results = np.full((data.shape[0], bins.shape[0]), -1.0) win_size = int(time_window * fps) - for left, right in zip( - prange(0, (data.shape[0] - win_size) + 1), - prange(win_size - 1, data.shape[0] + 1), - ): - sample = data[left : right + 1] + for left, right in zip(prange(0, (data.shape[0] - win_size) + 1), prange(win_size - 1, data.shape[0] + 1)): + sample = data[left:right + 1] for bin_cnt in range(bins.shape[0]): if bins[bin_cnt][0] > bins[bin_cnt][1]: mask = ((sample >= bins[bin_cnt][0]) & (sample <= 360)) | ( - (sample >= 0) & (sample <= bins[bin_cnt][1]) - ) + (sample >= 0) & (sample <= bins[bin_cnt][1])) else: mask = (sample >= bins[bin_cnt][0]) & (sample <= bins[bin_cnt][1]) results[right][bin_cnt] = data[mask].shape[0] / data.shape[0] return results @staticmethod - @njit("(float32[:],)") + @njit('(float32[:],)') def rotational_direction(data: np.ndarray) -> np.ndarray: """ Jitted compute of frame-by-frame rotational direction within a 1D timeseries array of angular data. @@ -960,12 +889,10 @@ def rotational_direction(data: np.ndarray) -> np.ndarray: data = data % 360 data_rad = np.deg2rad(data) - result, prior_angle = np.full((data.shape[0]), -1.0), data_rad[0] + result, prior_angle = np.full((data.shape[0]), -1.), data_rad[0] for i in prange(1, data.shape[0]): current_angle = data_rad[i] - distance = np.round( - np.rad2deg(np.pi - np.abs(np.pi - np.abs(current_angle - prior_angle))) - ) + distance = np.round(np.rad2deg(np.pi - np.abs(np.pi - np.abs(current_angle - prior_angle)))) if data[i - 1] == data[i]: result[i] = 0 elif data[i - 1] == (data[i] - distance) % 360: @@ -976,41 +903,34 @@ def rotational_direction(data: np.ndarray) -> np.ndarray: return result -# data = np.array([260, 280, 300, 340, 360, 0, 10, 350, 0, 15]).astype(np.float32) -# CircularStatisticsMixin().sliding_circular_range(data=data, time_windows=np.array([0.5]), fps=10) -# - -# sample_2 = np.array([110, 170, 190, 205, 255]).astype(np.float32) - - -# CircularStatisticsMixin().circular_correlation(sample_1=sample_1, sample_2=sample_2) -# data = np.array([350, 100, 110, 105, 1, 45, 301, 206, 180, 45]).astype(np.float32) -# CircularStatisticsMixin().sliding_angular_diff(data=data, time_windows=np.array([0.5]), fps=10) - -# CircularStatisticsMixin().sliding_mean_resultant_vector_length(data=data.astype(np.float32),time_windows=np.array([0.5]), fps=10) - - -# (π - |(π - (|time_window_1 - time_window_2|)| - -# np.rad2deg(np.arctan2(np.mean(np.sin(np.deg2rad(data))), np.mean(np.cos(np.deg2rad(data))))) - - -# sample_1 = np.deg2rad([10, 40, 90, 100, 90]) -# sample_2 = np.deg2rad([10, 40, 90, 100, 1]) +# data_sizes = [1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000] +# # # +# # for data_size in data_sizes: +# # data = np.random.randint(low=0, high=360, size=(data_size,)).astype(np.float32) +# # start = time.time() +# # CircularStatisticsMixin().rayleigh(data=data) +# # print(time.time() - start) # # -# # sample_2 = np.random.randint(low=0, high=360, size=(100,)).astype(np.float32) -# # D = CircularStatisticsMixin().sliding_kuipers_two_sample_test(sample_1=sample_1, sample_2=sample_2, time_windows=np.array([0.5, 5]), fps=2) -# -# D = CircularStatisticsMixin().watson_williams_test(sample_1=sample_1, sample_2=sample_2) -# print(D) - - -# data_2 = np.random.normal(loc=45, scale=1, size=100) -# -# data = np.array([70, 72, 85, 92, 92, 100, 110, 120, 120, 130, 130]).astype(np.float32) -# -# CircularStatisticsMixin().circular_std(data=data) - -# sqrt(-2 * log(abs(mean(exp(1j * data))))) +# for data_size in data_sizes: +# data = np.random.randint(low=0, high=360, size=(data_size,)).astype(np.float32) +# start = time.time() +# CircularStatisticsMixin().rolling_rayleigh_z(data=data, fps=10, time_windows=np.array([0.5, 1.0])) +# print(time.time() - start) + + +# for data_size in data_sizes: +# data = np.random.randint(low=0, high=360, size=(data_size,)).astype(np.float32) +# start = time.time() +# CircularStatisticsMixin().degrees_to_cardinal(degree_angles=data) +# print(time.time() - start) + + +# for data_size in data_sizes: +# nose_loc = np.random.randint(low=0, high=500, size=(data_size, 2)).astype(np.float32) +# left_ear_loc = np.random.randint(low=0, high=500, size=(data_size, 2)).astype(np.float32) +# right_ear_loc = np.random.randint(low=0, high=500, size=(data_size, 2)).astype(np.float32) +# start = time.time() +# results = CircularStatisticsMixin().direction_two_bps(swim_bladder_loc=nose_loc,tail_loc=left_ear_loc) +# print(time.time() - start) \ No newline at end of file