diff --git a/docs/conf.py b/docs/conf.py index 07997d855..18e1909b2 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -45,7 +45,14 @@ latex_engine = 'xelatex' latex_elements = {'papersize': 'letterpaper'} - +mathjax_config = { + 'TeX': { + 'Macros': { + 'bar': r'\overline', + # Other macros can be added here + } + } +} # source_suffix = { # '.rst': 'restructuredtext', # '.ipynb': 'nbsphinx', diff --git a/docs/simba.statistics_mixin.rst b/docs/simba.statistics_mixin.rst index ed2a7a39b..3cf8237d2 100644 --- a/docs/simba.statistics_mixin.rst +++ b/docs/simba.statistics_mixin.rst @@ -4,7 +4,6 @@ Statistics mixin .. autoclass:: simba.mixins.statistics_mixin.Statistics :members: :undoc-members: - :inherited-members: Statistics GPU methods @@ -12,5 +11,4 @@ Statistics GPU methods .. automodule:: simba.data_processors.cuda.statistics :members: - :undoc-members: - :show-inheritance: \ No newline at end of file + :undoc-members: \ No newline at end of file diff --git a/simba/mixins/circular_statistics.py b/simba/mixins/circular_statistics.py index 48ebee904..fd12fdd42 100644 --- a/simba/mixins/circular_statistics.py +++ b/simba/mixins/circular_statistics.py @@ -56,7 +56,7 @@ def mean_resultant_vector_length(data: np.ndarray) -> float: data points towards a central direction on the circle with a range between 0 and 1. .. image:: _static/img/mean_resultant_vector.png - :width: 600 + :width: 400 :align: center .. math:: @@ -71,7 +71,8 @@ def mean_resultant_vector_length(data: np.ndarray) -> float: :parameter np.ndarray data: 1D array of size len(frames) representing angles in degrees. - :returns float: The mean resultant vector of the angles. 1 represents tendency towards a single point. 0 represents no central point. + :returns: The mean resultant vector of the angles. 1 represents tendency towards a single point. 0 represents no central point. + :rtype: float :example: >>> data = np.array([50, 90, 70, 60, 20, 90]).astype(np.float32) @@ -133,6 +134,14 @@ def circular_mean(data: np.ndarray) -> float: """ Jitted compute of the circular mean of single sample. + .. math:: + \mu = \text{atan2}\left(\frac{1}{N} \sum_{i=1}^{N} \sin(\theta_i), \frac{1}{N} \sum_{i=1}^{N} \cos(\theta_i)\right) + + Where: + - :math:`\mu` is the circular mean in degrees. + - :math:`\theta_i` are the individual angles in degrees. + - :math:`N` is the number of samples. + :param np.ndarray data: 1D array of size len(frames) representing angles in degrees. :returns: The circular mean of the angles in degrees. :rtype: float @@ -327,8 +336,8 @@ def degrees_to_cardinal(data: np.ndarray) -> List[str]: Convert degree angles to cardinal direction bucket e.g., 0 -> "N", 180 -> "S" .. note:: - To convert cardinal literals to integers, map using ``simba.utils.enums.lookups.cardinality_to_integer_lookup``. - To convert integers to cardinal literals, map using ``simba.utils.enums.lookups.integer_to_cardinality_lookup``. + To convert cardinal literals to integers, map using :func:`simba.utils.enums.lookups.cardinality_to_integer_lookup`. + To convert integers to cardinal literals, map using :func:`simba.utils.enums.lookups.integer_to_cardinality_lookup`. .. image:: _static/img/degrees_to_cardinal.png :width: 600 @@ -480,10 +489,11 @@ def rayleigh(data: np.ndarray) -> Tuple[float, float]: """ Jitted compute of Rayleigh Z (test of non-uniformity) of single sample of circular data in degrees. - .. note: + .. note:: Adapted from ``pingouin.circular.circ_rayleigh`` and ``pycircstat.tests.rayleigh``. - The Rayleigh Z score is calculated as follows: + + The Rayleigh Z score is calculated as follows: .. math:: Z = nR^2 @@ -603,13 +613,21 @@ def sliding_circular_correlation(sample_1: np.ndarray, sample_2: np.ndarray, tim .. note:: Values prior to the ending of the first time window will be filles with ``0``. + .. math:: + r = \frac{\sum \sin(\theta_1 - \bar{\theta_1}) \cdot \sin(\theta_2 - \bar{\theta_2})}{\sqrt{\sum \sin^2(\theta_1 - \bar{\theta_1}) \cdot \sum \sin^2(\theta_2 - \bar{\theta_2})}} + + Where: + - :math:`r` is the circular correlation coefficient. + - :math:`\theta_1` and :math:`\theta_2` are the angular data points from the two samples. + - :math:`\bar{\theta_1}` and :math:`\bar{\theta_2}` are the mean angles of the two samples. + .. seealso: :func:`simba.mixins.circular_statistics.CircularStatisticsMixin.circular_correlation` - :parameter np.ndarray sample_1: Angular data for e.g., Animal 1 - :parameter np.ndarray sample_1: Angular data for e.g., Animal 2 - :parameter float time_windows: Size of sliding time window in seconds. E.g., two windows of 0.5s and 1s would be represented as np.array([0.5, 1.0]) - :parameter int fps: Frame-rate of recorded video. + :param np.ndarray sample_1: Angular data for e.g., Animal 1 + :param np.ndarray sample_1: Angular data for e.g., Animal 2 + :param float time_windows: Size of sliding time window in seconds. E.g., two windows of 0.5s and 1s would be represented as np.array([0.5, 1.0]) + :param int fps: Frame-rate of recorded video. :return: Array of size len(sample_1) x len(time_window) with correlation coefficients. :rtype: np.ndarray @@ -626,12 +644,8 @@ def sliding_circular_correlation(sample_1: np.ndarray, sample_2: np.ndarray, tim for j in prange(win_size, sample_1.shape[0] + 1): data_1_window = sample_1[j - win_size : j] data_2_window = sample_2[j - win_size : j] - m1 = np.arctan2( - np.mean(np.sin(data_1_window)), np.mean(np.cos(data_1_window)) - ) - m2 = np.arctan2( - np.mean(np.sin(data_2_window)), np.mean(np.cos(data_2_window)) - ) + m1 = np.arctan2(np.mean(np.sin(data_1_window)), np.mean(np.cos(data_1_window))) + m2 = np.arctan2(np.mean(np.sin(data_2_window)), np.mean(np.cos(data_2_window))) sin_1, sin_2 = np.sin(data_1_window - m1), np.sin(data_2_window - m2) denominator = np.sqrt(np.sum(sin_1 * sin_1) * np.sum(sin_2 * sin_2)) numerator = np.sum(sin_1 * sin_2) @@ -743,6 +757,15 @@ def rao_spacing(data: np.array): Computes the uniformity of a circular dataset in degrees. Low output values represent concentrated angularity, while high values represent dispersed angularity. + The Rao's Spacing (:math:`U`) is calculated as follows: + + .. math:: + + U = \\frac{1}{2} \\sum_{i=1}^{N} |l - T_i| + + where :math:`N` is the number of data points in the sliding window, :math:`T_i` is the spacing between adjacent data points, and :math:`l` is the equal angular spacing. + + :parameter ndarray data: 1D array of size len(frames) with data in degrees. :return: Rao's spacing measure, indicating the dispersion or concentration of angular data points. :rtype: int @@ -836,12 +859,24 @@ def kuipers_two_sample_test(sample_1: np.ndarray, sample_2: np.ndarray) -> float Kuiper's two-sample test is a non-parametric test used to determine if two samples are drawn from the same circular distribution. It is particularly useful for circular data, such as angles or directions. + The Kuiper test statistic is calculated as the sum of the maximum positive and negative deviations between the cumulative distribution functions of the two samples: + + .. math:: + + V = \max(F_1(\theta) - F_2(\theta)) + \max(F_2(\theta) - F_1(\theta)) + + Where: + + - :math:`F_1(\theta)` and :math:`F_2(\theta)` are the empirical cumulative distribution functions (CDFs) of the two circular samples. + - :math:`\theta` are the sorted angles in the two samples. + .. note:: Adapted from `Kuiper `__ by `Anne Archibald `_. .. seealso:: :func:`simba.mixins.circular_statistics.CircularStatisticsMixin.sliding_kuipers_two_sample_test` + :param ndarray data: The first circular sample array in degrees. :param ndarray data: The second circular sample array in degrees. :return: Kuiper's test statistic. @@ -904,9 +939,7 @@ def sliding_kuipers_two_sample_test(sample_1: np.ndarray, sample_2: np.ndarray, return results @staticmethod - def sliding_hodges_ajne( - data: np.ndarray, time_window: float, fps: int - ) -> np.ndarray: + def sliding_hodges_ajne(data: np.ndarray, time_window: float, fps: int) -> np.ndarray: data = np.deg2rad(data) results, window_size = np.full((data.shape[0]), -1.0), int(time_window * fps) @@ -1214,7 +1247,8 @@ def fit_circle(data: np.ndarray, max_iterations: Optional[int] = 400) -> np.ndar :parameter np.ndarray data: A 3D NumPy array with shape (N, M, 2). N represent frames, M represents the number of body-parts, and 2 represents x and y coordinates. :parameter int max_iterations: The maximum number of iterations for fitting the circle. - :returns np.ndarray: Array with shape (N, 3) with N representing frame and 3 representing (i) X-coordinate of the circle center, (ii) Y-coordinate of the circle center, and (iii) Radius of the circle + :return: Array with shape (N, 3) with N representing frame and 3 representing (i) X-coordinate of the circle center, (ii) Y-coordinate of the circle center, and (iii) Radius of the circle + :rtype: np.ndarray :example: >>> data = np.array([[[5, 10], [10, 5], [15, 10], [10, 15]]]) diff --git a/simba/mixins/feature_extraction_circular_mixin.py b/simba/mixins/feature_extraction_circular_mixin.py deleted file mode 100644 index 63157e16b..000000000 --- a/simba/mixins/feature_extraction_circular_mixin.py +++ /dev/null @@ -1,550 +0,0 @@ -from typing import List - -import numpy as np -from numba import jit, prange, typed -from scipy import stats - - -class FeatureExtractionCircularMixin(object): - """ - Mixin for circular statistics. Support for multiple animals and base - radial directions derived from two or three body-parts. - - .. important:: - See references below for mature packages computing extensive circular measurements. - - .. image:: _static/img/circular_statistics.png - :width: 800 - :align: center - - References - ---------- - .. [1] `pycircstat `_. - .. [2] `circstat `_. - .. [3] `pingouin.circular `_. - .. [4] `pycircular `_. - .. [5] `scipy.stats.directional_stats `_. - .. [6] `astropy.stats.circstats `_. - """ - - def __init__(self): - pass - - @staticmethod - def rolling_mean_dispersion( - data: np.ndarray, time_windows: np.ndarray, fps: int - ) -> np.ndarray: - """ - Compute the angular mean dispersion (circular mean) in degrees within rolling temporal windows. - - :parameter np.ndarray data: 1d array with feature values in degrees. - :parameter np.ndarray time_windows: Rolling time-windows as floats in seconds. E.g., [0.2, 0.4, 0.6] - :parameter int fps: fps of the recorded video - :returns np.ndarray: Size data.shape[0] x time_windows.shape[0] array - - .. image:: _static/img/mean_rolling_timeseries_angle.png - :width: 1000 - :align: center - - .. attention:: - The returned values represents the angular mean dispersion in the time-window ``[current_frame-time_window->current_frame]``. - `-1` is returned when ``current_frame-time_window`` is less than 0. - - :example: - >>> data = np.random.normal(loc=45, scale=1, size=20) - >>> FeatureExtractionCircularMixin().rolling_mean_dispersion(data=data,time_windows=np.array([0.5]), fps=10) - >>> [ [-1],[-1],[-1],[-1], [-1],[44],[44],[43],[44],[44],[44],[44],[44],[44],[44],[45],[45],[45],[45],[45]]) - """ - - results = np.full((data.shape[0], time_windows.shape[0]), -1) - for time_window in prange(time_windows.shape[0]): - jump_frms = int(time_windows[time_window] * fps) - for current_frm in prange(jump_frms, results.shape[0] + 1): - data_window = np.deg2rad(data[current_frm - jump_frms : current_frm]) - results[current_frm - 1][time_window] = np.rad2deg( - stats.circmean(data_window) - ).astype(int) - return results - - @staticmethod - @jit(nopython=True) - def degrees_to_compass_cardinal(degree_angles: np.ndarray) -> List[str]: - """ - Convert degree angles to cardinal direction bucket e.g., 0 -> "N", 180 -> "S" - - .. note:: - To convert cardinal literals to integers, map using ``simba.utils.enums.lookups.cardinality_to_integer_lookup``. - To convert integers to cardinal literals, map using ``simba.utils.enums.lookups.integer_to_cardinality_lookup``. - - :parameter degree_angles nose_loc: 1d array of degrees. Note: return by ``self.head_direction``. - :return List[str]: List of strings representing frame-wise cardinality - - :example: - >>> data = np.array(list(range(0, 405, 45))) - >>> FeatureExtractionCircularMixin().degrees_to_compass_cardinal(degree_angles=data) - >>> ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW', 'N'] - """ - results = typed.List(["str"]) - DIRECTIONS = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"] - for i in prange(degree_angles.shape[0]): - ix = round(degree_angles[i] / (360.0 / len(DIRECTIONS))) - direction = DIRECTIONS[ix % len(DIRECTIONS)] - results.append(direction) - return results[1:] - - @staticmethod - @jit(nopython=True) - def direction_three_bps( - nose_loc: np.ndarray, left_ear_loc: np.ndarray, right_ear_loc: np.ndarray - ) -> np.ndarray: - """ - Jitted helper to compute the degree angle from three body-parts. Computes the angle in degrees left_ear <-> nose - and right_ear_nose and returns the midpoint. - - .. image:: _static/img/angle_from_3_bps.png - :width: 600 - :align: center - - :parameter ndarray nose_loc: 2D array of size len(frames)x2 representing nose coordinates - :parameter ndarray left_ear_loc: 2D array of size len(frames)x2 representing left ear coordinates - :parameter ndarray right_ear_loc: 2D array of size len(frames)x2 representing right ear coordinates - :return np.ndarray: Array of size nose_loc.shape[0] with direction in degrees. - - :example: - >>> nose_loc = np.random.randint(low=0, high=500, size=(50, 2)).astype('float32') - >>> left_ear_loc = np.random.randint(low=0, high=500, size=(50, 2)).astype('float32') - >>> right_ear_loc = np.random.randint(low=0, high=500, size=(50, 2)).astype('float32') - >>> results = FeatureExtractionCircularMixin().direction_three_bps(nose_loc=nose_loc, left_ear_loc=left_ear_loc, right_ear_loc=right_ear_loc) - """ - - results = np.full((nose_loc.shape[0]), np.nan) - for i in prange(nose_loc.shape[0]): - left_ear_to_nose = np.degrees( - np.arctan2( - left_ear_loc[i][0] - nose_loc[i][1], - left_ear_loc[i][1] - nose_loc[i][0], - ) - ) - right_ear_nose = np.degrees( - np.arctan2( - right_ear_loc[i][0] - nose_loc[i][1], - right_ear_loc[i][1] - nose_loc[i][0], - ) - ) - results[i] = ((left_ear_to_nose + right_ear_nose) % 360) / 2 - return results - - @staticmethod - @jit(nopython=True) - def direction_two_bps(bp_x: np.ndarray, bp_y: np.ndarray) -> np.ndarray: - """ - Jitted method computing degree directionality from two body-parts. E.g., ``nape`` and ``nose``, - or ``swim_bladder`` and ``tail``. - - .. image:: _static/img/angle_from_2_bps.png - :width: 1200 - :align: center - - :parameter np.ndarray bp_x: Size len(frames) x 2 representing x and y coordinates for first body-part. - :parameter np.ndarray bp_y: Size len(frames) x 2 representing x and y coordinates for second body-part. - :return np.ndarray: Frame-wise directionality in degrees. - - :example: - >>> swim_bladder_loc = np.random.randint(low=0, high=500, size=(50, 2)).astype('float32') - >>> tail_loc = np.random.randint(low=0, high=500, size=(50, 2)).astype('float32') - >>> results = FeatureExtractionCircularMixin().direction_three_bps(bp_x=swim_bladder_loc, bp_y=tail_loc) - """ - - results = np.full((bp_x.shape[0]), np.nan) - for i in prange(bp_x.shape[0]): - angle_degrees = np.degrees( - np.arctan2(bp_x[i][0] - bp_y[i][0], bp_y[i][1] - bp_x[i][1]) - ) - angle_degrees = angle_degrees + 360 if angle_degrees < 0 else angle_degrees - results[i] = angle_degrees - return results - - @staticmethod - @jit(nopython=True) - def rolling_resultant_vector_length( - data: np.ndarray, fps: int, time_windows: np.ndarray = np.array([1.0]) - ) -> np.ndarray: - """ - Jitted helper computing the mean resultant vector within rolling time window. - - .. note: - Adapted from ``pingouin.circular.circ_r``. - - .. attention:: - The returned values represents resultant vector length in the time-window ``[current_frame-time_window->current_frame]``. - `-1` is returned where ``current_frame-time_window`` is less than 0. - - :parameter np.ndarray data: 1D array of size len(frames) representing degrees. - :parameter np.ndarray time_window: Rolling time-window as float in seconds. Default: 1s rolling time-window. - :parameter int fps: fps of the recorded video - :returns np.ndarray: Size len(frames) representing resultant vector length in the prior ``time_window``. - - :example: - >>> data_1, data_2 = np.random.normal(loc=45, scale=3, size=20), np.random.normal(loc=45, scale=150, size=30) - >>> data = np.hstack([data_1, data_2]) - >>> FeatureExtractionCircularMixin().rolling_resultant_vector_length(data=data,time_windows=np.array([1]), fps=20) - >>> [[-1],[-1],[-1],[-1],[-1],[ 0.998],[ 0.999],[ 0.999],[ 0.999],[ 0.997],[ 0.997],[ 0.996],[ 0.996],[ 0.996],[ 0.998],[ 0.998],[ 0.999],[ 0.998],[ 0.998],[ 0.999],[ 0.998],[ 0.655],[ 0.644],[ 0.367],[ 0.277],[ 0.193],[ 0.582],[ 0.200],[ 0.254],[ 0.235],[ 0.126],[ 0.145],[ 0.336],[ 0.719],[ 0.682],[ 0.780],[ 0.576],[ 0.314],[ 0.333],[ 0.291],[ 0.304],[ 0.095],[ 0.410],[ 0.061],[ 0.052],[ 0.262],[ 0.217],[ 0.485],[ 0.411],[ 0.736]]) - """ - - data = np.deg2rad(data) - results = np.full((data.shape[0], time_windows.shape[0]), -1.0) - for time_window_cnt in prange(time_windows.shape[0]): - window_size = int(time_windows[time_window_cnt] * fps) - for window_end in prange(window_size, data.shape[0] + 1, 1): - window_data = data[window_end - window_size : window_end] - w = np.ones(window_data.shape[0]) - r = np.nansum(np.multiply(w, np.exp(1j * window_data))) - results[window_end - 1][time_window_cnt] = np.abs(r) / np.nansum(w) - return results - - @staticmethod - @jit(nopython=True) - def _helper_rayleigh_z(data: np.ndarray, window_size: int): - results = np.full((data.shape[0], 2), np.nan) - for i in range(data.shape[0]): - r = window_size * data[i] - results[i][0] = (r**2) / window_size - results[i][1] = np.exp( - np.sqrt(1 + 4 * window_size + 4 * (window_size**2 - r**2)) - - (1 + 2 * window_size) - ) - return results - - def rolling_rayleigh_z( - self, data: np.ndarray, fps: int, time_window: float = 1.0 - ) -> np.array: - """ - Compute Rayleigh Z (test of non-uniformity) of circular data within rolling time-window. - - .. note: - Adapted from ``pingouin.circular.circ_rayleigh``. - - :parameter ndarray data: 1D array of size len(frames) representing degrees. - :parameter np.ndarray time_window: Rolling time-window as float in seconds. Default: 1s rolling time-window. - :parameter int fps: fps of the recorded video - :returns np.ndarray: Size data.shape[0] x 2 with Rayleigh Z statistics in first column and associated p_values in second column - """ - - results, window_size = np.full((data.shape[0], 2), np.nan), int( - time_window * fps - ) - resultant_vector_lengths = ( - FeatureExtractionCircularMixin().rolling_resultant_vector_length( - data=data, fps=fps, time_window=time_window - ) - ) - return np.nan_to_num( - self._helper_rayleigh_z( - data=resultant_vector_lengths, window_size=window_size - ), - nan=-1.0, - ) - - @staticmethod - @jit(nopython=True) - def rolling_circular_correlation( - data_x: np.ndarray, data_y: np.ndarray, fps: int, time_window: float = 1.0 - ) -> np.ndarray: - """ - Compute correlations between two angular distributions in rolling time-windows. - - .. image:: _static/img/cicle_correlation.png - :width: 800 - :align: center - - :parameter np.ndarray data_x: Angular data for e.g., Animal 1 - :parameter np.ndarray data_y: Angular data for e.g., Animal 2 - :parameter int fps: Frame-rate of video. - :parameter float time_window: Time window in seconds. - :return np.ndarray: Correlation coefficients for ``data_x`` and ``data_y``. - """ - - data_x, data_y = np.deg2rad(data_x), np.deg2rad(data_y) - results = np.full((data_x.shape[0]), np.nan) - window_size = int(time_window * fps) - for window_start in prange(0, data_x.shape[0] - window_size + 1): - data_x_window = data_x[window_start : window_start + window_size] - data_y_window = data_y[window_start : window_start + window_size] - x_sin = np.sin( - data_x_window - - np.angle(np.nansum(np.multiply(1, np.exp(1j * data_x_window)))) - ) - y_sin = np.sin( - data_y_window - - np.angle(np.nansum(np.multiply(1, np.exp(1j * data_y_window)))) - ) - r = np.sum(x_sin * y_sin) / np.sqrt(np.sum(x_sin**2) * np.sum(y_sin**2)) - results[window_start + window_size] = ( - np.sqrt( - (data_x_window.shape[0] * (x_sin**2).mean() * (y_sin**2).mean()) - / np.mean(x_sin**2 * y_sin**2) - ) - * r - ) - - return results - - @staticmethod - def rolling_circular_stdev( - data: np.ndarray, fps: int, time_windows: np.ndarray - ) -> np.ndarray: - """ - Compute standard deviation of angular data in rolling time windows. - - .. image:: _static/img/angle_stdev.png - :width: 800 - :align: center - - :parameter ndarray data: 1D array of size len(frames) representing degrees. - :parameter np.ndarray time_window: Rolling time-window as float in seconds. - :parameter int fps: fps of the recorded video - :returns np.ndarray: Size data.shape[0] x time_windows.shape[0] with angular standard deviations in rolling time windows. - - :example: - >>> data_1, data_2 = np.random.normal(loc=180, scale=3, size=10), np.random.normal(loc=90, scale=700, size=10) - >>> data = np.hstack([data_1, data_2]) - >>> results = FeatureExtractionCircularMixin().rolling_circular_stdev(data=data, time_windows=np.array([1]), fps=10) - """ - - data = np.deg2rad(data) - results = np.full((data.shape[0], time_windows.shape[0]), 0.0) - for time_window_cnt in prange(time_windows.shape[0]): - window_size = int(time_windows[time_window_cnt] * fps) - for window_end in prange(window_size, data.shape[0] + 1, 1): - window_data = data[window_end - window_size : window_end] - results[window_end - 1][time_window_cnt] = stats.circvar(window_data) - return np.round(results, 4) - - @staticmethod - @jit(nopython=True) - def rolling_angular_difference( - data: np.ndarray, fps: int, time_windows: np.ndarray - ): - """ - Computes the angular difference in the current frame versus N seconds previously. - For example, if the current angle is 45 degrees, and the angle N seconds previously was 350 degrees, then the difference - is 55 degrees. - - .. note:: - Frames where current frame - N seconds prior equal a negative value is populated with 0. - - :parameter ndarray data: 1D array of size len(frames) representing degrees. - :parameter np.ndarray time_window: Rolling time-window as float in seconds. - :parameter int fps: fps of the recorded video - - :example: - >>> data = np.array([350, 350, 1, 1]) - >>> FeatureExtractionCircularMixin().rolling_angular_difference(data=data, fps=1, time_windows=np.array([1])) - >>> [[ 0.], [ 0.], [11.], [ 0.]] - """ - - data = np.deg2rad(data) - results = np.full((data.shape[0], time_windows.shape[0]), 0.0) - for time_window_cnt in prange(time_windows.shape[0]): - window_size = int(time_windows[time_window_cnt] * fps) - for window_end in prange(window_size, data.shape[0], 1): - point_one, point_two = data[window_end - window_size], data[window_end] - print(point_one, point_two) - distance = np.pi - np.abs(np.pi - np.abs(point_one - point_two)) - results[window_end][time_window_cnt] = np.rad2deg(distance) - - return results - - @staticmethod - @jit(nopython=True) - def agg_angular_difference_timebins( - data: np.ndarray, fps: int, time_windows: np.ndarray - ): - """ - Compute the difference between the median angle in the current time-window versus the previous time window. - For example, computes the difference between the mean angle in the first 1s of the video versus - the second 1s of the video, the second 1s of the video versus the third 1s of the video, ... etc. - - .. note:: - The first time-bin of the video can't be compared against the prior time-bin of the video and the results - for this first time-bin will be populated with `0`. - - :parameter ndarray data: 1D array of size len(frames) representing degrees. - :parameter np.ndarray time_window: Rolling time-window as float in seconds. - :parameter int fps: fps of the recorded video - - :example: - >>> data = np.random.normal(loc=45, scale=3, size=20) - >>> FeatureExtractionCircularMixin().agg_angular_difference_timebins(data=data,time_windows=np.array([1]), fps=5) - """ - - data = np.deg2rad(data) - results = np.full((data.shape[0], time_windows.shape[0]), 0.0) - for time_window_cnt in prange(time_windows.shape[0]): - window_size = int(time_windows[time_window_cnt] * fps) - prior_window = [0, window_size] - for win_cnt, window_end in enumerate( - prange(int(window_size * 2), data.shape[0] + 1, window_size) - ): - window_start = (window_end - window_size) - 1 - current_data = data[window_start:window_end] - prior_data = data[prior_window[0] : prior_window[1]] - prior_median = np.arctan2( - np.median(np.cos(prior_data)), np.median(np.sin(prior_data)) - ) - current_median = np.arctan2( - np.median(np.cos(current_data)), np.median(np.sin(current_data)) - ) - distance = np.pi - np.abs(np.pi - np.abs(prior_median - current_median)) - results[window_start:window_end, win_cnt] = np.rad2deg(distance) - prior_window = [window_start, window_end] - - return results - - @staticmethod - @jit(nopython=True) - def instantaneous_angular_velocity(data: np.ndarray, bin_size: int = 1): - """ - Jitted compute of absolute angular change in the smallest possible time bin. - - .. note:: - If the smallest possible frame-to-frame time-bin in Video 1 is 33ms (recorded at 30fps), and the - smallest possible frame-to-frame time-bin in Video 2 is 66ms (recorded at 15fps) we have to correct for - this across recordings using the ``bin_size`` argument. E.g., when passing angular data from Video 1 - we would set bin_size to ``2``, and when passing angular data for Video 2 we would set bin_size to ``1`` to - allow comparisons of instantaneous angular velocity between Video 1 and Video 2. - - When current frame minus bin_size results in a negative index, 0 is returned. - - :parameter ndarray data: 1D array of size len(frames) representing degrees. - :parameter int bin_size: The number of frames prior to compare the current angular velocity against. - - :example: - >>> data = np.array([350, 355, 356, 357]) - >>> FeatureExtractionCircularMixin().instantaneous_angular_velocity(data=data, bin_size=1) - >>> [0., 5., 1., 1.] - >>> FeatureExtractionCircularMixin().instantaneous_angular_velocity(data=data, bin_size=2) - >>> [0., 0., 6., 2.] - """ - data = np.deg2rad(data) - results = np.full((data.shape[0]), 0.0) - left_idx, right_idx = 0, bin_size - for end_idx in prange(right_idx, data.shape[0] + 1, 1): - results[end_idx] = np.rad2deg( - np.pi - np.abs(np.pi - np.abs(data[left_idx] - data[end_idx])) - ) - left_idx += 1 - return results - - @staticmethod - def sliding_rao_spacing( - data: np.ndarray, time_window: float, fps: int - ) -> np.ndarray: - """ - Compute the uniformity of a circular dataset in sliding window of size ``time_window``. - - :parameter ndarray data: 1D array of size len(frames) representing degrees. - :parameter np.ndarray time_window: Rolling time-window as float in seconds. - :parameter int fps: fps of the recorded video - :return np.ndarray: representing rao-spacing U in every sliding windows [-window:n] - - .. image:: _static/img/raospacing.png - :width: 800 - :align: center - - :references: - .. [1] `UCSB `__. - - :example: - >>> data = np.random.randint(low=0, high=360, size=(500,)) - >>> result = FeatureExtractionCircularMixin().sliding_rao_spacing(data=data, time_window=0.5, fps=10) - """ - - results = np.full((data.shape[0]), -1.0) - window_size = int(time_window * fps) - for i in range(window_size, data.shape[0]): - w_data = np.sort(data[i - window_size : i]) - Ti, TiL = np.full((w_data.shape[0]), np.nan), np.full( - (w_data.shape[0]), np.nan - ) - l = 360 / len(w_data) - Ti[-1] = np.rad2deg( - np.pi - - np.abs(np.pi - np.abs(np.deg2rad(w_data[0]) - np.deg2rad(w_data[-1]))) - ) - for j in prange(w_data.shape[0] - 1, -1, -1): - Ti[j] = np.rad2deg( - np.pi - - np.abs( - np.pi - - np.abs(np.deg2rad(w_data[j]) - np.deg2rad(w_data[j - 1])) - ) - ) - for k in prange(Ti.shape[0]): - TiL[k] = np.max((l, Ti[k])) - np.min((l, Ti[k])) - S = np.sum(TiL) - U = int(S / 2) - results[i] = U - return results - - -# data = np.random.normal(loc=90, scale=360, size=500) -# data = np.random.randint(low=0, high=360, size=(500,)) -# result = FeatureExtractionCircularMixin().sliding_rao_spacing(data=data, time_window=0.5, fps=10) -# print(result) -# raospacing(np.deg2rad(data[-6:-1]), axis=0) - -# #data = np.random.normal(loc=90, scale=2, size=20) -# data = np.array([350, 355, 356, 357]) -# angle_data = FeatureExtractionCircularMixin().instantaneous_angular_velocity(data=data, bin_size=2) -# -# - - -# data = np.random.normal(loc=45, scale=3, size=20) -# FeatureExtractionCircularMixin().agg_angular_difference_timebins(data=data,time_windows=np.array([1]), fps=5) - - -# nose_loc = np.random.randint(low=0, high=500, size=(200, 2)).astype('float32') -# left_ear_loc = np.random.randint(low=0, high=500, size=(200, 2)).astype('float32') -# -# angle_data = FeatureExtractionCircularMixin().direction_two_bps(bp_x=nose_loc, bp_y=left_ear_loc) -# - - -# data_1 = np.random.normal(loc=45, scale=3, size=20) -# data_2 = np.random.normal(loc=45, scale=150, size=30) -# data = np.hstack([data_1, data_2]) -# FeatureExtractionCircularMixin().rolling_resultant_vector_length(data=data,time_windows=np.array([1]), fps=5) -# -# - - -# data = np.random.normal(loc=45, scale=1, size=20) -# FeatureExtractionCircularMixin().rolling_mean_dispersion(data=data,time_windows=np.array([0.5]), fps=10) - - -# data = np.array(list(range(0, 405, 45))) -# results = FeatureExtractionCircularMixin().degrees_to_compass_cardinal(degree_angles=data) - - -data = np.array(list(range(0, 405, 45))) -results = FeatureExtractionCircularMixin().degrees_to_compass_cardinal( - degree_angles=data -) - - -# def direction_two_bps(bp_x: np.ndarray, -# bp_y: np.ndarray) -> np.ndarray: - - -# right_ear_loc = np.random.randint(low=0, high=500, size=(200, 2)).astype('float32') -# angle_data = FeatureExtractionCircularMixin().head_direction(nose_loc=nose_loc, left_ear_loc=left_ear_loc, right_ear_loc=right_ear_loc) -# -# resultant_length = FeatureExtractionCircularMixin().rolling_resultant_vector_length(data=angle_data.astype(np.int8), time_window=1.0, fps=25) -# -# #resultant_length = FeatureExtractionCircularMixin().rolling_rayleigh_z(data=angle_data.astype(np.int8), time_window=2.0, fps=5) - -# start = time.time() -# correlation = FeatureExtractionCircularMixin().rolling_circular_correlation(data_x=angle_data.astype(np.int8), data_y=angle_data.astype(np.int8), time_window=2.0, fps=5) -# print(time.time() - start) diff --git a/simba/mixins/statistics_mixin.py b/simba/mixins/statistics_mixin.py index 32ded6920..3e53be3db 100644 --- a/simba/mixins/statistics_mixin.py +++ b/simba/mixins/statistics_mixin.py @@ -158,9 +158,9 @@ def independent_samples_t( t = \frac{\bar{x}_1 - \bar{x}_2}{s_p \sqrt{\frac{1}{n_1} + \frac{1}{n_2}}} where: - - \\(\bar{x}_1\\) and \\(\bar{x}_2\\) are the means of sample_1 and sample_2 respectively, - - \\(s_p\\) is the pooled standard deviation, - - \\(n_1\\) and \\(n_2\\) are the sample sizes of sample_1 and sample_2 respectively. + - :math:`\bar{x}_1` and :math:`\bar{x}_2` are the means of the two samples, + - :math:`s_p` is the pooled standard deviation, + - :math:`n_1` and :math:`n_2` are the sizes of the two samples. .. seealso:: :func:`simba.mixins.statistics_mixin.Statistics.rolling_independent_sample_t` @@ -218,8 +218,8 @@ def cohens_d(sample_1: np.ndarray, sample_2: np.ndarray) -> float: d = \\frac{{\\bar{x}_1 - \\bar{x}_2}}{{\\sqrt{{\\frac{{s_1^2 + s_2^2}}{2}}}}} where: - - \\(\\bar{x}_1\\) and \\(\\bar{x}_2\\) are the means of sample_1 and sample_2 respectively, - - \\(s_1\\) and \\(s_2\\) are the standard deviations of sample_1 and sample_2 respectively. + - :math:`\bar{x}_1` and :math:`\bar{x}_2` are the means of sample_1 and sample_2 respectively, + - :math:`s_1` and :math:`s_2` are the standard deviations of sample_1 and sample_2 respectively. :param ndarray sample_1: First 1d array representing feature values. :param ndarray sample_2: Second 1d array representing feature values. @@ -612,7 +612,7 @@ def jensen_shannon_divergence( JSD = 1: Indicates that the two distributions are maximally dissimilar. .. math:: - JSD = \frac{{KL(P_1 || M) + KL(P_2 || M)}}{2} + JSD = \frac{KL(P_1 || M) + KL(P_2 || M)}{2} :parameter ndarray sample_1: First 1d array representing feature values. :parameter ndarray sample_2: Second 1d array representing feature values. @@ -1033,20 +1033,19 @@ def kruskal_wallis(sample_1: np.ndarray, sample_2: np.ndarray) -> float: The Kruskal-Wallis test is a non-parametric method for testing whether samples originate from the same distribution. It ranks all the values from the combined samples, then calculates the H statistic based on the ranks. - .. math:: H = \\frac{{12}}{{n(n + 1)}} \\left(\\frac{{(\\sum R_{\text{sample1}})^2}}{{n_1}} + \\frac{{(\\sum R_{\text{sample2}})^2}}{{n_2}}\\right) - 3(n + 1) where: - - \( n \) is the total number of observations, - - \( n_1 \) and \( n_2 \) are the number of observations in sample 1 and sample 2 respectively, - - \( R_{\text{sample1}} \) and \( R_{\text{sample2}} \) are the sums of ranks for sample 1 and sample 2 respectively. - + - :math:`n` is the total number of observations, + - :math:`n_1` and :math:`n_2` are the number of observations in sample 1 and sample 2 respectively, + - :math:`R_{\text{sample1}}` and :math:`R_{\text{sample2}}` are the sums of ranks for sample 1 and sample 2 respectively. - :parameter ndarray sample_1: First 1d array representing feature values. - :parameter ndarray sample_2: Second 1d array representing feature values. - :returns float: Kruskal-Wallis H statistic. + :param ndarray sample_1: First 1d array representing feature values. + :param ndarray sample_2: Second 1d array representing feature values. + :returns: Kruskal-Wallis H statistic. + :rtype: float :example: >>> sample_1 = np.array([1, 1, 3, 4, 5]).astype(np.float64) @@ -1256,14 +1255,14 @@ def brunner_munzel(sample_1: np.ndarray, sample_2: np.ndarray) -> float: W = -\\frac{{n_x \\cdot n_y \\cdot (\\bar{R}_y - \\bar{R}_x)}}{{(n_x + n_y) \\cdot \\sqrt{{n_x \\cdot S_x + n_y \\cdot S_y}}}} where: - - \( n_x \) and \( n_y \) are the sizes of sample_1 and sample_2 respectively, - - \( \bar{R}_x \) and \( \bar{R}_y \) are the mean ranks of sample_1 and sample_2 respectively, - - \( S_x \) and \( S_y \) are the dispersion statistics of sample_1 and sample_2 respectively. + - :math:`n_x` and :math:`n_y` are the sizes of sample_1 and sample_2 respectively, + - :math:`\bar{R}_x` and :math:`\bar{R}_y` are the mean ranks of sample_1 and sample_2 respectively, + - :math:`S_x` and :math:`S_y` are the dispersion statistics of sample_1 and sample_2 respectively. :parameter ndarray sample_1: First 1d array representing feature values. :parameter ndarray sample_2: Second 1d array representing feature values. - :returns float: Brunner-Munzel W. - + :returns: Brunner-Munzel W. + :rtype: float :example: >>> sample_1, sample_2 = np.random.normal(loc=10, scale=2, size=10), np.random.normal(loc=20, scale=2, size=10) @@ -1626,11 +1625,31 @@ def chow_test(self): @njit("(int64[:, :]), bool_") def concordance_ratio(x: np.ndarray, invert: bool) -> float: """ - Calculate the concordance ratio of a 2D numpy array. + Calculate the concordance ratio of a 2D numpy array. The concordance ratio is a measure of agreement in a dataset. It is calculated as the ratio of the number of + rows that contain only one unique value to the total number of rows. + + The equation for the concordance ratio :math:`C` is given by: + + .. math:: + C = \frac{N_c}{N_t} + + where: + - :math:`N_c` is the count of rows with only one unique value, + - :math:`N_t` is the total number of rows in the array. + + If the `invert` parameter is set to `True`, the function will return the disconcordance ratio instead, defined as: + + .. math:: + D = \frac{N_d}{N_t} + + where: + + - :math:`N_d` is the count of rows with more than one unique value. :param np.ndarray x: A 2D numpy array with ordinals represented as integers. :param bool invert: If True, the concordance ratio is inverted, and disconcordance ratio is returned - :return float: The concordance ratio, representing the count of rows with only one unique value divided by the total number of rows in the array. + :return: The concordance ratio, representing the count of rows with only one unique value divided by the total number of rows in the array. + :rtype: float :example: >>> x = np.random.randint(0, 2, (5000, 4)) @@ -2427,12 +2446,11 @@ def eta_squared(x: np.ndarray, y: np.ndarray) -> float: Eta-squared (\(\eta^2\)) is calculated as the ratio of the sum of squares between groups to the total sum of squares. Range from 0 to 1, where larger values indicate a stronger effect size. - .. math:: - \eta^2 = \frac{SS_{between}}{SS_{between} + SS_{within}} + The equation for eta squared is defined as: :math:`\eta^2 = \frac{SS_{between}}{SS_{between} + SS_{within}}` where: - - \( SS_{between} \) is the sum of squares between groups. - - \( SS_{within} \) is the sum of squares within groups. + - :math:`SS_{between}` is the sum of squares between groups, + - :math:`SS_{within}` is the sum of squares within groups. :param np.ndarray x: 1D array containing the dependent variable data. :param np.ndarray y: 1d array containing the grouping variable (categorical) data of same size as ``x``. @@ -2463,7 +2481,8 @@ def sliding_eta_squared(x: np.ndarray, y: np.ndarray, window_sizes: np.ndarray, :param np.ndarray y: The array containing the grouping variable (categorical) data. :param np.ndarray window_sizes: 1D array of window sizes in seconds. :param int sample_rate: The sampling rate of the data in frames per second. - :return np.ndarray: Array of size x.shape[0] x window_sizes.shape[0] with sliding eta squared values. + :return: Array of size x.shape[0] x window_sizes.shape[0] with sliding eta squared values. + :rtype: np.ndarray :example: >>> x = np.random.randint(0, 10, (10000,)) @@ -2617,7 +2636,7 @@ def cohens_h(sample_1: np.ndarray, sample_2: np.ndarray) -> float: \\text{Cohen's h} = 2 \\arcsin\\left(\\sqrt{\\frac{\\sum\\text{sample\_1}}{N\_1}}\\right) - 2 \\arcsin\\left(\\sqrt{\\frac{\\sum\\text{sample\_2}}{N\_2}}\\right) - Where N_1 and N_2 are the sample sizes of sample_1 and sample_2, respectively. + Where :math:`N_1` and :math:`N_2` are the sample sizes of sample_1 and sample_2, respectively. :param np.ndarray sample_1: 1D array with binary [0, 1] values (e.g., first classifier inference values). :param np.ndarray sample_2: 1D array with binary [0, 1] values (e.g., second classifier inference values). @@ -2774,13 +2793,15 @@ def hamming_distance(x: np.ndarray, \\text{Hamming distance}(x, y) = \\frac{{\\sum_{i=1}^{n} w_i}}{{n}} where: - - \( n \) is the length of the vectors, - - \( w_i \) is the weight associated with the \( i \)th element of the vectors. + - :math:`n` is the length of the vectors, + - :math:`w_i` is the weight associated with the math:`i`th element of the vectors. :parameter np.ndarray x: First binary vector. :parameter np.ndarray x: Second binary vector. :parameter Optional[np.ndarray] w: Optional weights for each element. Can be classification probabilities. If not provided, equal weights are assumed. :parameter Optional[bool] sort: If True, sorts x and y prior to hamming distance calculation. Default, False. + :return: Hamming similarity + :rtype: float :example: >>> x, y = np.random.randint(0, 2, (10,)).astype(np.int8), np.random.randint(0, 2, (10,)).astype(np.int8) @@ -2963,12 +2984,16 @@ def hellinger_distance(self, x: np.ndarray, y: np.ndarray, bucket_method: Option H(P, Q) = \frac{1}{\sqrt{2}} \sqrt{ \sum_{i=1}^{n} (\sqrt{P(i)} - \sqrt{Q(i)})^2 } - where \( n \) is the number of bins in the histogram representation of the distributions. + where: + - :math:`P(i)` is the probability of the :math:`i`-th event in distribution :math:`P`, + - :math:`Q(i)` is the probability of the :math:`i`-th event in distribution :math:`Q`, + - :math:`n` is the number of events. :param np.ndarray x: First 1D array representing a probability distribution. :param np.ndarray y: Second 1D array representing a probability distribution. :param Optional[Literal['fd', 'doane', 'auto', 'scott', 'stone', 'rice', 'sturges', 'sqrt']] bucket_method: Method for computing histogram bins. Default is 'auto'. - :returns float: Hellinger distance between the two input probability distributions. + :returns: Hellinger distance between the two input probability distributions. + :rtype: float :example: >>> x = np.random.randint(0, 9000, (500000,)) @@ -2993,7 +3018,8 @@ def youden_j(sample_1: np.ndarray, sample_2: np.ndarray) -> float: :param sample_1: The first binary array. :param sample_2: The second binary array. - :return float: Youden's J statistic. + :return: Youden's J statistic. + :rtype: float """ check_valid_array(data=sample_1, source=f'{Statistics.youden_j.__name__} sample_1', accepted_ndims=(1,), accepted_values=[0, 1]) @@ -3104,10 +3130,10 @@ def cohens_kappa(sample_1: np.ndarray, sample_2: np.ndarray): \\kappa = 1 - \\frac{\sum{w_{ij} \\cdot D_{ij}}}{\\sum{w_{ij} \\cdot E_{ij}}} where: - - \( \kappa \) is Cohen's Kappa coefficient, - - \( w_{ij} \) are the weights, - - \( D_{ij} \) are the observed frequencies, - - \( E_{ij} \) are the expected frequencies. + - :math:`\kappa` is Cohen's Kappa coefficient, + - :math:`w_{ij}` are the weights, + - :math:`D_{ij}` are the observed frequencies, + - :math:`E_{ij}` are the expected frequencies. :param np.ndarray sample_1: The first binary sample, a 1D NumPy array of integers. :param np.ndarray sample_2: The second binary sample, a 1D NumPy array of integers. @@ -3154,15 +3180,16 @@ def d_prime( d' = \\Phi^{-1}(hit\\_rate) - \\Phi^{-1}(false\\_alarm\\_rate) where: - - \( \\Phi^{-1} \) is the inverse of the cumulative distribution function (CDF) of the normal distribution, - - \( hit\\_rate \) is the proportion of true positives correctly identified, - - \( false\\_alarm\\_rate \) is the proportion of false positives incorrectly identified. + - :math:`\\Phi^{-1}` is the inverse of the cumulative distribution function (CDF) of the normal distribution, + - :math:`hit\\_rate` is the proportion of true positives correctly identified, + - :math:`false\\_alarm\\_rate` is the proportion of false positives incorrectly identified. :param np.ndarray x: Boolean 1D array of response values, where 1 represents presence, and 0 representing absence. :param np.ndarray y: Boolean 1D array of ground truth, where 1 represents presence, and 0 representing absence. :param Optional[float] lower_limit: Lower limit to bound hit and false alarm rates. Defaults to 0.0001. :param Optional[float] upper_limit: Upper limit to bound hit and false alarm rates. Defaults to 0.9999. - :return float: The calculated d' (d-prime) value. + :return: The calculated d' (d-prime) value. + :rtype: float :example: >>> x = np.random.randint(0, 2, (1000,)) @@ -3293,12 +3320,25 @@ def cochrans_q(data: np.ndarray) -> Tuple[float, float]: It can be used to evaluate if the performance of multiple (>=2) classifiers on the same data is the same or significantly different. .. note:: - If two classifiers, consider ``simba.mixins.statistics.Statistics.mcnemar``. + If two classifiers, consider :func:`simba.mixins.statistics.Statistics.mcnemar`. Useful background: https://psych.unl.edu/psycrs/handcomp/hccochran.PDF - :param np.ndarray data: Two dimensional array of boolean values where axis 1 represents classifiers or features and rows represent frames. - :return Tuple[float, float]: Cochran's Q statistic signidicance value. + .. math:: + Q = \frac{(k - 1) \left( kG^2 - \left( \sum_{j=1}^{k} C_j \right)^2 \right)}{kR - S} + + where: + + - :math:`k` is the number of classifiers, + - :math:`G = \sum_{j=1}^{k} C_j^2` (the sum of the squares of the column sums), + - :math:`C_j` is the sum of the :math:`j`-th column (number of successes for the :math:`j`-th classifier), + - :math:`R = \sum_{i=1}^{n} R_i` (the total number of successes across all classifiers), + - :math:`S = \sum_{i=1}^{n} R_i^2` (the sum of the squares of the row sums), + - :math:`R_i` is the sum of the :math:`i`-th row (number of successes for the :math:`i`-th observation). + + :param np.ndarray data: Two-dimensional array of boolean values where axis 1 represents classifiers or features and rows represent frames. + :return: Cochran's Q statistic signidicance value. + :rtype: Tuple[float, float] :example: >>> data = np.random.randint(0, 2, (100000, 4)) @@ -3336,15 +3376,16 @@ def hartley_fmax(x: np.ndarray, y: np.ndarray) -> float: Values close to one represent closer to equal variance. .. math:: - - \text{Hartley's Fmax} = \frac{\max(\text{Var}(x), \text{Var}(y))}{\min(\text{Var}(x), \text{Var}(y))} + \text{Hartley's } F_{max} = \frac{\max(\text{Var}(x), \text{Var}(y))}{\min(\text{Var}(x), \text{Var}(y))} where: - - Var(x) is the variance of sample x, - - Var(y) is the variance of sample y. + - :math:`\text{Var}(x)` is the variance of sample :math:`x`, + - :math:`\text{Var}(y)` is the variance of sample :math:`y`. :param np.ndarray x: 1D array representing numeric data of the first group/feature. :param np.ndarray x: 1D array representing numeric data of the second group/feature. + :return: Hartley's Fmax statistic. + :rtype: float :example: >>> x = np.random.random((100,)) @@ -3379,17 +3420,17 @@ def grubbs_test(x: np.ndarray, left_tail: Optional[bool] = False) -> float: extreme value (either the minimum or maximum) and the sample mean, divided by the sample standard deviation. .. math:: - \text{Grubbs' Test Statistic} = \frac{|\bar{x} - x_{\text{min/max}}|}{s} where: - - \( \bar{x} \) is the sample mean, - - \( x_{\text{min/max}} \) is the minimum or maximum value of the sample (depending on the tail being tested), - - \( s \) is the sample standard deviation. + - :math:`\bar{x}` is the sample mean, + - :math:`x_{\text{min/max}}` is the minimum or maximum value of the sample (depending on the tail being tested), + - :math:`s` is the sample standard deviation. :param np.ndarray x: 1D array representing numeric data. :param Optional[bool] left_tail: If True, the test calculates the Grubbs' test statistic for the left tail (minimum value). If False (default), it calculates the statistic for the right tail (maximum value). - :return float: The computed Grubbs' test statistic. + :return: The computed Grubbs' test statistic. + :rtype: float :example: >>> x = np.random.random((100,)) @@ -3550,7 +3591,7 @@ def dunn_index(x: np.ndarray, y: np.ndarray, sample: Optional[float] = None) -> The Dunn Index is given by: .. math:: - D = \frac{\min_{i \neq j} \{ \delta(C_i, C_j) \}}{\max_k \{ \Delta(C_k) \}} + D = \frac{\min_{i \neq j} \{ \delta(C_i, C_j) \}}{\max_k \{ \Delta(C_k) \}} where :math:`\delta(C_i, C_j)` is the distance between clusters :math:`C_i` and :math:`C_j`, and :math:`\Delta(C_k)` is the diameter of cluster :math:`C_k`. @@ -3679,13 +3720,14 @@ def calinski_harabasz(x: np.ndarray, y: np.ndarray) -> float: The Calinski-Harabasz score (CH) is calculated as: .. math:: - CH = \frac{B}{W} \times \frac{N - k}{k - 1} + + CH = \\frac{B}{W} \\times \\frac{N - k}{k - 1} where: - - B is the sum of squared distances between cluster centroids, - - W is the sum of squared distances from each point to its assigned cluster centroid, - - N is the total number of data points, - - k is the number of clusters. + - :math:`B` is the sum of squared distances between cluster centroids, + - :math:`W` is the sum of squared distances from each point to its assigned cluster centroid, + - :math:`N` is the total number of data points, + - :math:`k` is the number of clusters. :param x: 2D array representing the data points. Shape (n_samples, n_features/n_dimension). :param y: 2D array representing cluster labels for each data point. Shape (n_samples,). @@ -3781,9 +3823,9 @@ def fowlkes_mallows(x: np.ndarray, y: np.ndarray) -> float: FMI = \\sqrt{\\frac{TP}{TP + FP} \\times \\frac{TP}{TP + FN}} where: - - TP (True Positive) is the number of pairs of elements that are in the same cluster in both x and y, - - FP (False Positive) is the number of pairs of elements that are in the same cluster in y but not in x, - - FN (False Negative) is the number of pairs of elements that are in the same cluster in x but not in y. + - :math:`TP` (True Positive) is the number of pairs of elements that are in the same cluster in both x and y, + - :math:`FP` (False Positive) is the number of pairs of elements that are in the same cluster in y but not in x, + - :math:`FN` (False Negative) is the number of pairs of elements that are in the same cluster in x but not in y. .. note:: Modified from `scikit-learn `_ @@ -3824,13 +3866,15 @@ def adjusted_mutual_info(x: np.ndarray, y: np.ndarray) -> float: \text{AMI}(x, y) = \frac{\text{MI}(x, y) - E(\text{MI}(x, y))}{\max(H(x), H(y)) - E(\text{MI}(x, y))} where: - - \text{MI}(x, y) \text{ is the mutual information between } x \text{ and } y. - - E(\text{MI}(x, y)) \text{ is the expected mutual information.} - - H(x) \text{ and } H(y) \text{ are the entropies of } x \text{ and } y, \text{ respectively.} + - :math:`\text{MI}(x, y)` is the mutual information between :math:`x` and :math:`y`. + - :math:`E(\text{MI}(x, y))` is the expected mutual information. + - :math:`H(x)` and :math:`H(y)` are the entropies of :math:`x` and :math:`y`, respectively. + :param np.ndarray x: 1D array representing the labels of the first model. :param np.ndarray y: 1D array representing the labels of the second model. - :return float: Score between 0 and 1, where 1 indicates perfect clustering agreement. + :return: Score between 0 and 1, where 1 indicates perfect clustering agreement. + :rtype: float """ check_valid_array( @@ -3861,8 +3905,10 @@ def czebyshev_distance(sample_1: np.ndarray, sample_2: np.ndarray) -> float: .. note:: Normalize arrays sample_1 and sample_2 before passing it to ensure accurate results. - .. math:: - D_\infty(p, q) = \max_i \left| p_i - q_i \right| + The equation for the Czebyshev distance is given by: :math:`D_\infty(p, q) = \max_i \left| p_i - q_i \right|`. + + .. seealso: + :func:`simba.mixins.statistics_mixin.Statistics.sliding_czebyshev_distance` :param np.ndarray sample_1: The first sample, an N-dimensional NumPy array. :param np.ndarray sample_2: The second sample, an N-dimensional NumPy array. @@ -3892,6 +3938,9 @@ def sliding_czebyshev_distance(x: np.ndarray, window_sizes: np.ndarray, sample_r .. note:: Normalize array x before passing it to ensure accurate results. + .. seealso: + :func:`simba.mixins.statistics_mixin.Statistics.czebyshev_distance` + :param np.ndarray x: Input signal, a 2D array with shape (n_samples, n_features). :param np.ndarray window_sizes: Array containing window sizes for sliding computation. :param float sample_rate: Sampling rate of the signal. diff --git a/simba/mixins/video_processing_mixin.py b/simba/mixins/video_processing_mixin.py deleted file mode 100644 index 925c62dba..000000000 --- a/simba/mixins/video_processing_mixin.py +++ /dev/null @@ -1,94 +0,0 @@ -import functools -import multiprocessing -import os -import shutil -import subprocess - -try: - from typing import List -except: - from typing_extensions import List - -from simba.utils.checks import check_file_exist_and_readable, check_int -from simba.utils.read_write import get_fn_ext - - -class VideoProcessingMixin(object): - """ - Methods for videos processing - """ - - def __init__(self): - pass - - @staticmethod - def _chunk_video_helper(chunk_range, video_path, chunk_ranges, temp_dir): - start_time, end_time = chunk_range - chunk_index = chunk_ranges.index(chunk_range) - output_file = os.path.join(temp_dir, f"{chunk_index}.mp4") - command = 'ffmpeg -i "{}" -ss {} -to {} -c copy "{}" -y'.format( - video_path, start_time, end_time, output_file - ) - subprocess.call(command, shell=True) - return output_file - - @staticmethod - def _ffmpeg_cmd_multiprocessor(command: str): - print(command) - subprocess.call(command, shell=True) - - def split_video_into_n_cunks(self, video_path: str, n: int): - - dir, video_name, _ = get_fn_ext(filepath=video_path) - temp_dir = os.path.join(dir, video_name + "_temp") - if os.path.isdir(temp_dir): - shutil.rmtree(temp_dir) - if not os.path.isdir(temp_dir): - os.makedirs(temp_dir) - os.makedirs(dir, exist_ok=True) - duration = float( - subprocess.check_output( - 'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 "{}" -hide_banner -loglevel error'.format( - video_path - ), - shell=True, - ) - ) - chunk_duration = duration / n - chunk_ranges = [ - (i * chunk_duration, (i + 1) * chunk_duration) for i in range(n) - ] - file_paths = [] - with multiprocessing.Pool(n, maxtasksperchild=10) as pool: - constants = functools.partial( - self._chunk_video_helper, - video_path=video_path, - chunk_ranges=chunk_ranges, - temp_dir=temp_dir, - ) - for cnt, result in enumerate( - pool.imap(constants, chunk_ranges, chunksize=1) - ): - file_paths.append(result) - pool.terminate() - pool.join() - return file_paths - - def create_ouput_paths(self, video_input_paths: List[os.PathLike]): - - temp_folder = os.path.join(os.path.dirname(video_input_paths[0]), "temp") - if os.path.isdir(temp_folder): - shutil.rmtree(temp_folder) - if not os.path.isdir(temp_folder): - os.makedirs(temp_folder) - output_paths = [ - os.path.join(temp_folder, os.path.basename(x)) for x in video_input_paths - ] - return output_paths - - -# video_processor = VideoProcessingMixin() -# -# video_processor.change_single_video_fps(video_path='/Users/simon/Desktop/Example_1_frame_no.mp4', core_cnt=5) -# -#