From c5389d7e73b8cf2e8dc4e3a5569bb776db7d6d44 Mon Sep 17 00:00:00 2001 From: sronilsson Date: Thu, 7 Nov 2024 11:38:08 -0500 Subject: [PATCH] timeseries features --- simba/mixins/timeseries_features_mixin.py | 382 ++++++++++++++++++++- simba/video_processors/video_processing.py | 2 +- 2 files changed, 380 insertions(+), 4 deletions(-) diff --git a/simba/mixins/timeseries_features_mixin.py b/simba/mixins/timeseries_features_mixin.py index 743b0b3d9..c8b25996e 100644 --- a/simba/mixins/timeseries_features_mixin.py +++ b/simba/mixins/timeseries_features_mixin.py @@ -22,10 +22,9 @@ import typing from typing import Optional, Tuple, get_type_hints -from simba.utils.checks import (check_float, check_instance, check_int, - check_str, check_that_column_exist, - check_valid_array, check_valid_lst) +from simba.utils.checks import (check_float, check_instance, check_int, check_str, check_that_column_exist, check_valid_array, check_valid_lst) from simba.utils.read_write import find_core_cnt +from simba.utils.enums import Formats class TimeseriesFeatureMixin(object): @@ -1844,3 +1843,380 @@ def sliding_pct_in_top_n( cnts = np.sort(np.unique(sample, return_counts=True)[1])[-n:] results[int(r - 1), i] = np.sum(cnts) / sample.shape[0] return results + + @staticmethod + def mean_squared_jerk(x: np.ndarray, + time_step: float, + sample_rate: float) -> float: + + """ + Calculate the Mean Squared Jerk (MSJ) for a given set of 2D positions over time. + + The Mean Squared Jerk is a measure of the smoothness of movement, calculated as the mean of + squared third derivatives of the position with respect to time. It provides an indication of + how abrupt or smooth a trajectory is, with higher values indicating more erratic movements. + + :param np.ndarray x: A 2D array where each row represents the [x, y] position at a time step. + :param float time_step: The time difference between successive positions in seconds. + :param float sample_rate: The rate at which the positions are sampled (samples per second). + :return: The computed Mean Squared Jerk for the input trajectory data. + :rtype: float + + :example I: + >>> x = np.random.randint(0, 500, (100, 2)) + >>> TimeseriesFeatureMixin.mean_squared_jerk(x=x, time_step=1.0, sample_rate=30) + """ + + check_float(name=f'{TimeseriesFeatureMixin.mean_squared_jerk.__name__} time_step', min_value=10e-6, value=time_step) + check_float(name=f'{TimeseriesFeatureMixin.mean_squared_jerk.__name__} sample_rate', min_value=10e-6, value=sample_rate) + check_valid_array(data=x, source=f'{TimeseriesFeatureMixin.mean_squared_jerk.__name__} x', accepted_ndims=(2,), accepted_axis_1_shape=[2, ], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + + frame_step = int(max(1.0, time_step * sample_rate)) + V = np.diff(x, axis=0) / frame_step + A = np.diff(V, axis=0) / frame_step + jerks = np.diff(A, axis=0) / frame_step + squared_jerks = np.sum(jerks ** 2, axis=1) + return float(np.mean(squared_jerks)) + + @staticmethod + def sliding_mean_squared_jerk(x: np.ndarray, + window_size: float, + sample_rate: float) -> np.ndarray: + """ + Calculates the mean squared jerk (rate of change of acceleration) for a position path in a sliding window. + + Jerk is the derivative of acceleration, and this function computes the mean squared jerk over sliding windows + across the entire path. High jerk values indicate abrupt changes in acceleration, while low values indicate + smoother motion. + + :param np.ndarray x: An (N, M) array representing the path of an object, where N is the number of samples (time steps) and M is the number of spatial dimensions (e.g., 2 for 2D motion). Each row represents the position at a time step. + :param float window_size: The size of each sliding window in seconds. This defines the interval over which the mean squared jerk is calculated. + :param float sample_rate: The sampling rate in Hz (samples per second), which is used to convert the window size from seconds to frames. + :return: A 1D array of length N, containing the mean squared jerk for each sliding window that ends at each time step. The first `frame_step` values will be NaN, as they do not have enough preceding data points to compute jerk over the full window. + :rtype: np.ndarray + + :example: + >>> x = np.random.randint(0, 500, (12, 2)) + >>> TimeseriesFeatureMixin.sliding_mean_squared_jerk(x=x, window_size=1.0, sample_rate=2) + + :example II: + >>> jerky_path = np.zeros((100, 2)) + >>> jerky_path[::10] = np.random.randint(0, 500, (10, 2)) + >>> non_jerky_path = np.linspace(0, 500, 100).reshape(-1, 1) + >>> non_jerky_path = np.hstack((non_jerky_path, non_jerky_path)) + >>> jerky_jerk_result = TimeseriesFeatureMixin.sliding_mean_squared_jerk(jerky_path, 1.0, 10) + >>> non_jerky_jerk_result = TimeseriesFeatureMixin.sliding_mean_squared_jerk(non_jerky_path, 1.0, 10) + """ + + V = np.diff(x, axis=0) + A = np.diff(V, axis=0) + frame_step = int(max(1.0, window_size * sample_rate)) + results = np.full(x.shape[0], fill_value=0, dtype=np.int64) + for r in range(frame_step, x.shape[0]): + l = r - frame_step + V_a = A[l:r, :] + jerks = np.diff(V_a, axis=0) + if jerks.shape[0] == 0: + results[r] = 0 + else: + results[r] = np.sum(jerks ** 2) / jerks.shape[0] + + return results + + @staticmethod + def linearity_index(x: np.ndarray) -> float: + + """ + Calculates the straightness (linearity) index of a path. + + :param np.ndarray x: An (N, M) array representing the path, where N is the number of points and M is the number of spatial dimensions (e.g., 2 for 2D or 3 for 3D). Each row represents the coordinates of a point along the path. + :return: The straightness index of the path, a value between 0 and 1, where 1 indicates a perfectly straight path. + :rtype: float + + :example: + >>> x = np.array([[0, 0], [1, 1], [2, 2], [3, 3], [4, 4], [5, 5]]) + >>> TimeseriesFeatureMixin.linearity_index(x=x) + >>> x = np.random.randint(0, 100, (100, 2)) + >>> TimeseriesFeatureMixin.linearity_index(x=x) + """ + + check_valid_array(data=x, source=f'{TimeseriesFeatureMixin.linearity_index.__name__} x', accepted_ndims=(2,), accepted_axis_1_shape=[2, ], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + straight_line_distance = np.linalg.norm(x[0] - x[-1]) + path_length = np.sum(np.linalg.norm(np.diff(x, axis=0), axis=1)) + if path_length == 0: + return 0.0 + else: + return straight_line_distance / path_length + + @staticmethod + def sliding_linearity_index(x: np.ndarray, + window_size: float, + sample_rate: float) -> np.ndarray: + + """ + Calculates the Linearity Index (Path Straightness) over a sliding window for a path represented by an array of points. + + The Linearity Index measures how straight a path is by comparing the straight-line distance between the start and end points of each window to the total distance traveled along the path. + + :param np.ndarray x: An (N, M) array representing the path, where N is the number of points and M is the number of spatial dimensions (e.g., 2 for 2D or 3 for 3D). Each row represents the coordinates of a point along the path. + :param float x: The size of the sliding window in seconds. This defines the time window over which the linearity index is calculated. The window size should be specified in seconds. + :param float sample_rate: The sample rate in Hz (samples per second), which is used to convert the window size from seconds to frames. + :return: A 1D array of length N, where each element represents the linearity index of the path within a sliding window. The value is a ratio between the straight-line distance and the actual path length for each window. Values range from 0 to 1, with 1 indicating a perfectly straight path. + :rtype: np.ndarray + + :example: + >>> x = np.random.randint(0, 100, (100, 2)) + >>> TimeseriesFeatureMixin.sliding_linearity_index(x=x, window_size=1, sample_rate=10) + """ + + frame_step = int(max(1.0, window_size * sample_rate)) + results = np.full(x.shape[0], fill_value=0.0, dtype=np.float32) + for r in range(frame_step, x.shape[0]): + l = r - frame_step + sample_x = x[l:r, :] + straight_line_distance = np.linalg.norm(sample_x[0] - sample_x[-1]) + path_length = np.sum(np.linalg.norm(np.diff(sample_x, axis=0), axis=1)) + if path_length == 0: + results[r] = 0.0 + else: + results[r] = straight_line_distance / path_length + return results + + @staticmethod + def entropy_of_directional_changes(x: np.ndarray, bins: int = 16) -> float: + """ + Computes the Entropy of Directional Changes (EDC) of a path represented by an array of points. + + The output value ranges from 0 to log2(bins). + + The Entropy of Directional Changes quantifies the unpredictability or randomness of the directional + changes in a given path. Higher entropy indicates more variation in the directions of the movement, + while lower entropy suggests more linear or predictable movement. + + The function works by calculating the change in direction between consecutive points, discretizing + those changes into bins, and then computing the Shannon entropy based on the probability distribution + of the directional changes. + + .. seealso:: + :func:`simba.mixins.timeseries_features_mixin.TimeseriesFeatureMixin.sliding_entropy_of_directional_changes` + + :param np.ndarray x: A 2D array of shape (N, 2) representing the path, where N is the number of points and each point has two spatial coordinates (e.g., x and y for 2D space). The path should be in the form of an array of consecutive (x, y) points. + :param int bins: The number of bins to discretize the directional changes. Default is 16 bins for angles between 0 and 360 degrees. A larger number of bins will increase the precision of direction change measurement. + :return: The entropy of the directional changes in the path. A higher value indicates more unpredictable or random direction changes, while a lower value indicates more predictable or linear movement. + :rtype: float + + :example: + >>> x = np.random.randint(0, 500, (100, 2)) + >>> TimeseriesFeatureMixin.entropy_of_directional_changes(x, 3) + """ + + check_int(name=f'{TimeseriesFeatureMixin.entropy_of_directional_changes.__name__} bins', value=bins) + check_valid_array(data=x, source=f'{TimeseriesFeatureMixin.entropy_of_directional_changes.__name__} x', accepted_ndims=(2,), accepted_axis_1_shape=[2, ], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + direction_vectors = np.diff(x, axis=0) + angles = np.arctan2(direction_vectors[:, 1], direction_vectors[:, 0]) * (180 / np.pi) + angles = (angles + 360) % 360 + angle_bins = np.linspace(0, 360, bins + 1) + digitized_angles = np.digitize(angles, angle_bins) - 1 + hist, _ = np.histogram(digitized_angles, bins=bins, range=(0, bins)) + hist = hist / hist.sum() + return np.max((0.0, -np.sum(hist * np.log2(hist + 1e-10)))) + + @staticmethod + def sliding_entropy_of_directional_changes(x: np.ndarray, + bins: int, + window_size: float, + sample_rate: float) -> np.ndarray: + """ + Computes a sliding window Entropy of Directional Changes (EDC) over a path represented by an array of points. + + The output value ranges from 0 to log2(bins). + + This function calculates the entropy of directional changes within a specified window, sliding across the entire path. + By analyzing the changes in direction over shorter segments (windows) of the path, it provides a dynamic view of + movement unpredictability or randomness along the path. Higher entropy within a window indicates more varied directional + changes, while lower entropy suggests more consistent directional movement within that segment. + + .. seealso:: + :func:`simba.mixins.timeseries_features_mixin.TimeseriesFeatureMixin.entropy_of_directional_changes` + + :param np.ndarray x: A 2D array of shape (N, 2) representing the path, where N is the number of points and each point has two spatial coordinates (e.g., x and y for 2D space). The path should be in the form of an array of consecutive (x, y) points. + :param int bins: The number of bins to discretize the directional changes. Default is 16 bins for angles between 0 and 360 degrees. A larger number of bins will increase the precision of direction change measurement. + :param float window_size: The duration of the sliding window, in seconds, over which to compute the entropy. + :param float sample_rate: The sampling rate (in frames per second) of the path data. This parameter converts `window_size` from seconds into frames, defining the number of consecutive points in each sliding window. + :return: A 1D numpy array of length N, where each element contains the entropy of directional changes for each frame, computed over the specified sliding window. Frames before the first full window contain NaN values. + :rtype: np.ndarray + + :example: + >>> x = np.random.randint(0, 100, (400, 2)) + >>> results = TimeseriesFeatureMixin.sliding_entropy_of_directional_changes(x=x, bins=16, window_size=5.0, sample_rate=30) + >>> x = pd.read_csv(r"C:\troubleshooting\two_black_animals_14bp\project_folder\csv\input_csv\Together_1.csv")[['Ear_left_1_x', 'Ear_left_1_y']].values + >>> results = TimeseriesFeatureMixin.sliding_entropy_of_directional_changes(x=x, bins=16, window_size=5.0, sample_rate=30) + """ + + direction_vectors = np.diff(x, axis=0) + angles = np.arctan2(direction_vectors[:, 1], direction_vectors[:, 0]) * (180 / np.pi) + angles = (angles + 360) % 360 + angle_bins = np.linspace(0, 360, bins + 1) + frame_step = int(max(1.0, window_size * sample_rate)) + results = np.full(shape=(x.shape[0]), fill_value=np.nan, dtype=np.float64) + for r in range(frame_step, direction_vectors.shape[0] + 1): + l = r - frame_step + sample_angles = angles[l:r] + digitized_angles = np.digitize(sample_angles, angle_bins) - 1 + hist, _ = np.histogram(digitized_angles, bins=bins, range=(0, bins)) + hist = hist / hist.sum() + results[r-1] = np.max((0.0, -np.sum(hist * np.log2(hist + 1e-10)))) + return results + + def path_curvature(x: np.ndarray, agg_type: Literal['mean', 'median', 'max'] = 'mean') -> float: + """ + Calculate aggregate curvature of a 2D path given an array of points. + + :param x: A 2D numpy array of shape (N, 2), where N is the number of points and each row is (x, y). + :param Literal['mean', 'median', 'max'] agg_type: The type of summary statistic to return. Options are 'mean', 'median', or 'max'. + :return: A single float value representing the path curvature based on the specified summary type. + :rtype: float + + :example: + >>> x = np.array([[0, 0], [1, 0.1], [2, 0.2], [3, 0.3], [4, 0.4]]) + >>> low = TimeseriesFeatureMixin.path_curvature(x) + >>> x = np.array([[0, 0], [1, 1], [2, 0], [3, 1], [4, 0]]) + >>> high = TimeseriesFeatureMixin.path_curvature(x) + """ + check_valid_array(data=x, source=f'{TimeseriesFeatureMixin.path_curvature.__name__} x', accepted_ndims=(2,), accepted_axis_1_shape=[2, ], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_str(name=f'{TimeseriesFeatureMixin.path_curvature.__name__} agg_type', value=agg_type, options=('mean', 'median', 'max')) + dx, dy = np.diff(x[:, 0]), np.diff(x[:, 1]) + x_prime, y_prime = dx[:-1], dy[:-1] + x_double_prime, y_double_prime = dx[1:] - dx[:-1], dy[1:] - dy[:-1] + curvature = np.abs(x_prime * y_double_prime - y_prime * x_double_prime) / (x_prime ** 2 + y_prime ** 2) ** ( + 3 / 2) + if agg_type == 'mean': + return np.float32(np.nanmean(curvature)) + elif agg_type == 'median': + return np.float32(np.nanmedian(curvature)) + else: + return np.float32(np.nanmax(curvature)) + + def sliding_path_curvature(x: np.ndarray, + agg_type: Literal['mean', 'median', 'max'], + window_size: float, + sample_rate: float) -> np.ndarray: + """ + Computes the curvature of a path over sliding windows along the path points, providing a measure of the path’s bending + or turning within each window. + + This function calculates curvature for each window segment by evaluating directional changes. It provides the option to + aggregate curvature values within each window using the mean, median, or maximum, depending on the desired level of + sensitivity to bends and turns. A higher curvature value indicates a sharper or more frequent directional change within + the window, while a lower curvature suggests a straighter or smoother path. + + :param x: A 2D array of shape (N, 2) representing the path, where N is the number of points, and each point has two spatial coordinates (e.g., x and y for 2D space). + :param Literal['mean', 'median', 'max'] agg_type: Type of aggregation for the curvature within each window. + :param float window_size: Duration of the window in seconds, used to define the size of each segment over which curvature is calculated. + :param float sample_rate: The rate at which path points were sampled (in points per second), used to convert the window size from seconds to frames + :return: An array of shape (N,) containing the computed curvature values for each window position along the path. Each element represents the aggregated curvature within a specific window, with `NaN` values for frames where the window does not fit. + :rtype: np.ndarray + + :example: + >>> x = np.random.randint(0, 500, (91, 2)) + >>> results = TimeseriesFeatureMixin.sliding_path_curvature(x=x, agg_type='mean', window_size=1, sample_rate=30) + """ + + frame_step = int(max(1.0, window_size * sample_rate)) + results = np.full(shape=(x.shape[0]), fill_value=np.nan, dtype=np.float32) + for r in range(frame_step, x.shape[0] + 1): + l = r - frame_step + sample_x = x[l:r] + dx, dy = np.diff(sample_x[:, 0]), np.diff(sample_x[:, 1]) + x_prime, y_prime = dx[:-1], dy[:-1] + x_double_prime, y_double_prime = dx[1:] - dx[:-1], dy[1:] - dy[:-1] + curvature = np.abs(x_prime * y_double_prime - y_prime * x_double_prime) / (x_prime ** 2 + y_prime ** 2) ** (3 / 2) + if agg_type == 'mean': + results[r - 1] = np.float32(np.nanmean(curvature)) + elif agg_type == 'median': + results[r - 1] = np.float32(np.nanmedian(curvature)) + else: + results[r - 1] = np.float32(np.nanmax(curvature)) + return results + + @staticmethod + def spatial_density(x: np.ndarray, + radius: float, + pixels_per_mm: float) -> float: + + """ + Computes the spatial density of trajectory points in a 2D array, based on the number of neighboring points + within a specified radius for each point in the trajectory. + + Spatial density provides insights into the movement pattern along a trajectory. Higher density values indicate + areas where points are closely packed, which can suggest slower movement, lingering, or frequent changes in + direction. Lower density values suggest more spread-out points, often associated with faster, more linear movement. + + :param np.ndarray x: A 2D array of shape (N, 2), where N is the number of points and each point has two spatial coordinates. + :param float radius: The radius within which to count neighboring points around each point. Defines the area of interest around each trajectory point. + :return: A single float value representing the average spatial density of the trajectory. + :rtype: float + + :example: + >>> x = np.array([[0, 0], [1, 1], [2, 2], [3, 3], [4, 4], [1, 0.5], [1.5, 1.5]]) + >>> density = TimeseriesFeatureMixin.spatial_density(x, pixels_per_mm=2.5, radius=5) + >>> high_density_points = np.array([[0, 0], [0.5, 0], [1, 0], [1.5, 0], [2, 0], [0, 0.5], [0.5, 0.5], [1, 0.5], [1.5, 0.5], [2, 0.5]]) + >>> low_density_points = np.array([[0, 0], [5, 5], [10, 10], [15, 15], [20, 20]]) + >>> high = TimeseriesFeatureMixin.spatial_density(x=high_density_points,radius=1, pixels_per_mm=1) + >>> low = TimeseriesFeatureMixin.spatial_density(x=low_density_points,radius=1, pixels_per_mm=1) + """ + + check_valid_array(data=x, source=f'{TimeseriesFeatureMixin.spatial_density.__name__} x', accepted_ndims=(2,), accepted_axis_1_shape=[2, ], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_float(name=f'{TimeseriesFeatureMixin.spatial_density.__name__} radius', value=radius) + check_float(name=f'{TimeseriesFeatureMixin.spatial_density.__name__} radius', value=pixels_per_mm) + pixel_radius = np.ceil(max(1.0, (radius * pixels_per_mm))) + n_points = x.shape[0] + total_neighbors = 0 + + for i in range(n_points): + distances = np.linalg.norm(x - x[i], axis=1) + neighbors = np.sum(distances <= pixel_radius) - 1 + total_neighbors += neighbors + + return total_neighbors / n_points + + @staticmethod + def sliding_spatial_density(x: np.ndarray, + radius: float, + pixels_per_mm: float, + window_size: float, + sample_rate: float) -> np.ndarray: + + """ + Computes the sliding spatial density of trajectory points in a 2D array, based on the number of neighboring points + within a specified radius, considering the density over a moving window of points. This function accounts for the + spatial scale in pixels per millimeter, providing a density measurement that is adjusted for the physical scale + of the trajectory. + + :param np.ndarray x: A 2D array of shape (N, 2), where N is the number of points and each point has two spatial coordinates (x, y). The array represents the trajectory path of points in a 2D space (e.g., x and y positions in space). + :param float radius: The radius (in millimeters) within which to count neighboring points around each trajectory point. Defines the area of interest around each point. + :param float pixels_per_mm: The scaling factor that converts the physical radius (in millimeters) to pixel units for spatial density calculations. + :param float window_size: The size of the sliding window (in seconds or points) to compute the density of points. A larger window size will consider more points in each density calculation. + :param float sample_rate: The rate at which to sample the trajectory points (e.g., frames per second or samples per unit time). It adjusts the granularity of the sliding window. + :return: A 1D numpy array where each element represents the computed spatial density for the trajectory at the corresponding point in time (or frame). Higher values indicate more densely packed points within the specified radius, while lower values suggest more sparsely distributed points. + :rtype: np.ndarray + + :example: + >>> x = np.random.randint(0, 20, (100, 2)) # Example trajectory with 100 points in 2D space + >>> results = TimeseriesFeatureMixin.sliding_spatial_density(x=x, radius=5.0, pixels_per_mm=10.0, window_size=1, sample_rate=31) + """ + + pixel_radius = np.ceil(max(1.0, (radius * pixels_per_mm))) + frame_window_size = int(np.ceil(max(1.0, (window_size * sample_rate)))) + results = np.full(shape=(x.shape[0]), fill_value=np.nan, dtype=np.float32) + for r in range(frame_window_size, x.shape[0] + 1): + l = r - frame_window_size + sample_x = x[l:r] + n_points, total_neighbors = sample_x.shape[0], 0 + for i in range(n_points): + distances = np.linalg.norm(sample_x - sample_x[i], axis=1) + neighbors = np.sum(distances <= pixel_radius) - 1 + total_neighbors += neighbors + results[r - 1] = total_neighbors / n_points + + return results diff --git a/simba/video_processors/video_processing.py b/simba/video_processors/video_processing.py index 9816a065f..d1f44121e 100644 --- a/simba/video_processors/video_processing.py +++ b/simba/video_processors/video_processing.py @@ -4208,7 +4208,7 @@ def rotate_video(video_path: Union[str, os.PathLike], """ Rotate a video or a directory of videos by a specified number of degrees. - .. video:: _static/img/rotate_video.webm + .. video:: _static/img/rotate_video.mp4 :width: 700 :loop: :autoplay: