From 25700a250c14c9b286b0e17bb87018271c8ea00b Mon Sep 17 00:00:00 2001 From: sronilsson Date: Wed, 20 Sep 2023 14:10:31 +0000 Subject: [PATCH] cleaned --- simba/mixins/feature_extraction_mixin.py | 467 ++++++++++++++--------- 1 file changed, 295 insertions(+), 172 deletions(-) diff --git a/simba/mixins/feature_extraction_mixin.py b/simba/mixins/feature_extraction_mixin.py index 950e872b4..54843507e 100644 --- a/simba/mixins/feature_extraction_mixin.py +++ b/simba/mixins/feature_extraction_mixin.py @@ -1,28 +1,31 @@ __author__ = "Simon Nilsson" import warnings + warnings.filterwarnings("ignore") +import glob +import math +import os +from typing import List, Optional + import numpy as np +import pandas as pd from numba import jit, njit, prange +from scipy import stats +from scipy.signal import find_peaks, savgol_filter from scipy.spatial import ConvexHull from scipy.spatial.qhull import QhullError -from scipy.signal import savgol_filter -import math -import os, glob -from scipy import stats -import pandas as pd -from scipy.signal import find_peaks -from typing import Optional, List - -from simba.utils.enums import Paths, Options -from simba.utils.checks import check_if_filepath_list_is_empty, check_file_exist_and_readable, check_minimum_roll_windows -from simba.utils.read_write import (read_project_path_and_file_type, - read_video_info_csv, - read_config_file, - get_bp_headers) -from simba.utils.errors import CountError, InvalidInputError import simba +from simba.utils.checks import (check_file_exist_and_readable, + check_if_filepath_list_is_empty, + check_minimum_roll_windows) +from simba.utils.enums import Options, Paths +from simba.utils.errors import CountError, InvalidInputError +from simba.utils.read_write import (get_bp_headers, read_config_file, + read_project_path_and_file_type, + read_video_info_csv) + class FeatureExtractionMixin(object): """ @@ -31,34 +34,48 @@ class FeatureExtractionMixin(object): :param Optional[configparser.Configparser] config_path: path to SimBA project_config.ini """ - def __init__(self, - config_path: Optional[str] = None): - + def __init__(self, config_path: Optional[str] = None): if config_path: self.config_path = config_path self.config = read_config_file(config_path=config_path) - self.project_path, self.file_type = read_project_path_and_file_type(config=self.config) - self.video_info_path = os.path.join(self.project_path, Paths.VIDEO_INFO.value) + self.project_path, self.file_type = read_project_path_and_file_type( + config=self.config + ) + self.video_info_path = os.path.join( + self.project_path, Paths.VIDEO_INFO.value + ) self.video_info_df = read_video_info_csv(file_path=self.video_info_path) - self.data_in_dir = os.path.join(self.project_path, Paths.OUTLIER_CORRECTED.value) - self.save_dir = os.path.join(self.project_path, Paths.FEATURES_EXTRACTED_DIR.value) - if not os.path.exists(self.save_dir): os.makedirs(self.save_dir) + self.data_in_dir = os.path.join( + self.project_path, Paths.OUTLIER_CORRECTED.value + ) + self.save_dir = os.path.join( + self.project_path, Paths.FEATURES_EXTRACTED_DIR.value + ) + if not os.path.exists(self.save_dir): + os.makedirs(self.save_dir) bp_path = os.path.join(self.project_path, Paths.BP_NAMES.value) check_file_exist_and_readable(file_path=bp_path) self.body_parts_lst = list(pd.read_csv(bp_path, header=None)[0]) - self.roll_windows_values = check_minimum_roll_windows(Options.ROLLING_WINDOW_DIVISORS.value, self.video_info_df['fps'].min()) - self.files_found = glob.glob(self.data_in_dir + '/*.' + self.file_type) - check_if_filepath_list_is_empty(filepaths=self.files_found, error_msg=f'No files of type {self.file_type} found in {self.data_in_dir}') + self.roll_windows_values = check_minimum_roll_windows( + Options.ROLLING_WINDOW_DIVISORS.value, self.video_info_df["fps"].min() + ) + self.files_found = glob.glob(self.data_in_dir + "/*." + self.file_type) + check_if_filepath_list_is_empty( + filepaths=self.files_found, + error_msg=f"No files of type {self.file_type} found in {self.data_in_dir}", + ) self.col_headers = get_bp_headers(body_parts_lst=self.body_parts_lst) - self.col_headers_shifted = [bp + '_shifted' for bp in self.col_headers] + self.col_headers_shifted = [bp + "_shifted" for bp in self.col_headers] @staticmethod @jit(nopython=True) - def euclidean_distance(bp_1_x: np.ndarray, - bp_2_x: np.ndarray, - bp_1_y: np.ndarray, - bp_2_y: np.ndarray, - px_per_mm: float) -> np.ndarray: + def euclidean_distance( + bp_1_x: np.ndarray, + bp_2_x: np.ndarray, + bp_1_y: np.ndarray, + bp_2_y: np.ndarray, + px_per_mm: float, + ) -> np.ndarray: """ Helper to compute the Euclidean distance in millimeters between two body-parts in all frames of a video @@ -80,15 +97,11 @@ def euclidean_distance(bp_1_x: np.ndarray, series = (np.sqrt((bp_1_x - bp_2_x) ** 2 + (bp_1_y - bp_2_y) ** 2)) / px_per_mm return series - @staticmethod @jit(nopython=True, fastmath=True, cache=True) - def angle3pt(ax: float, - ay: float, - bx: float, - by: float, - cx: float, - cy: float) -> float: + def angle3pt( + ax: float, ay: float, bx: float, by: float, cx: float, cy: float + ) -> float: """ Jitted helper for single frame 3-point angle. @@ -129,7 +142,9 @@ def angle3pt_serialized(data: np.ndarray) -> np.ndarray: results = np.full((data.shape[0]), 0.0) for i in prange(data.shape[0]): angle = math.degrees( - math.atan2(data[i][5] - data[i][3], data[i][4] - data[i][2]) - math.atan2(data[i][1] - data[i][3], data[i][0] - data[i][2])) + math.atan2(data[i][5] - data[i][3], data[i][4] - data[i][2]) + - math.atan2(data[i][1] - data[i][3], data[i][0] - data[i][2]) + ) if angle < 0: angle += 360 results[i] = angle @@ -195,15 +210,19 @@ def count_values_in_range(data: np.ndarray, ranges: np.ndarray) -> np.ndarray: for i in prange(data.shape[0]): for j in prange(ranges.shape[0]): lower_bound, upper_bound = ranges[j][0], ranges[j][1] - results[i][j] = data[i][np.logical_and(data[i] >= lower_bound, data[i] <= upper_bound)].shape[0] + results[i][j] = data[i][ + np.logical_and(data[i] >= lower_bound, data[i] <= upper_bound) + ].shape[0] return results @staticmethod @jit(nopython=True) - def framewise_euclidean_distance_roi(location_1: np.ndarray, - location_2: np.ndarray, - px_per_mm: float, - centimeter: bool = False) -> np.ndarray: + def framewise_euclidean_distance_roi( + location_1: np.ndarray, + location_2: np.ndarray, + px_per_mm: float, + centimeter: bool = False, + ) -> np.ndarray: """ Find frame-wise distances between a moving location (location_1) and static location (location_2) in millimeter or centimeter. @@ -237,8 +256,9 @@ def framewise_euclidean_distance_roi(location_1: np.ndarray, @staticmethod @jit(nopython=True) - def framewise_inside_rectangle_roi(bp_location: np.ndarray, - roi_coords: np.ndarray) -> np.ndarray: + def framewise_inside_rectangle_roi( + bp_location: np.ndarray, roi_coords: np.ndarray + ) -> np.ndarray: """ Jitted helper for frame-wise analysis if animal is inside static rectangular ROI. @@ -257,8 +277,14 @@ def framewise_inside_rectangle_roi(bp_location: np.ndarray, >>> [0, 0, 0, 0, 0, 0] """ results = np.full((bp_location.shape[0]), 0) - within_x_idx = np.argwhere((bp_location[:, 0] <= roi_coords[1][0]) & (bp_location[:, 0] >= roi_coords[0][0])).flatten() - within_y_idx = np.argwhere((bp_location[:, 1] <= roi_coords[1][1]) & (bp_location[:, 1] >= roi_coords[0][1])).flatten() + within_x_idx = np.argwhere( + (bp_location[:, 0] <= roi_coords[1][0]) + & (bp_location[:, 0] >= roi_coords[0][0]) + ).flatten() + within_y_idx = np.argwhere( + (bp_location[:, 1] <= roi_coords[1][1]) + & (bp_location[:, 1] >= roi_coords[0][1]) + ).flatten() for i in prange(within_x_idx.shape[0]): match = np.argwhere(within_y_idx == within_x_idx[i]) if match.shape[0] > 0: @@ -267,9 +293,9 @@ def framewise_inside_rectangle_roi(bp_location: np.ndarray, @staticmethod @jit(nopython=True) - def framewise_inside_polygon_roi(bp_location: np.ndarray, - roi_coords: np.ndarray) -> np.ndarray: - + def framewise_inside_polygon_roi( + bp_location: np.ndarray, roi_coords: np.ndarray + ) -> np.ndarray: """ Jitted helper for frame-wise detection if animal is inside static polygon ROI. @@ -300,7 +326,11 @@ def framewise_inside_polygon_roi(bp_location: np.ndarray, p1x, p1y = roi_coords[0] for j in prange(n + 1): p2x, p2y = roi_coords[j % n] - if (y > min(p1y, p2y)) and (y <= max(p1y, p2y)) and (x <= max(p1x, p2x)): + if ( + (y > min(p1y, p2y)) + and (y <= max(p1y, p2y)) + and (x <= max(p1x, p2x)) + ): if p1y != p2y: xints = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x if p1x == p2x or x <= xints: @@ -312,9 +342,9 @@ def framewise_inside_polygon_roi(bp_location: np.ndarray, return results @staticmethod - def windowed_frequentist_distribution_tests(data: np.ndarray, - feature_name: str, - fps: int) -> pd.DataFrame: + def windowed_frequentist_distribution_tests( + data: np.ndarray, feature_name: str, fps: int + ) -> pd.DataFrame: """ Calculates feature value distributions and feature peak counts in 1-s sequential time-bins. @@ -332,19 +362,26 @@ def windowed_frequentist_distribution_tests(data: np.ndarray, >>> FeatureExtractionMixin.windowed_frequentist_distribution_tests(data=feature_data, fps=25, feature_name='Anima_1_velocity') """ - ks_results, = np.full((data.shape[0]), -1.0), + (ks_results,) = (np.full((data.shape[0]), -1.0),) t_test_results = np.full((data.shape[0]), -1.0) shapiro_results = np.full((data.shape[0]), -1.0) peak_cnt_results = np.full((data.shape[0]), -1.0) for i in range(fps, data.shape[0] - fps, fps): bin_1_idx, bin_2_idx = [i - fps, i], [i, i + fps] - bin_1_data, bin_2_data = data[bin_1_idx[0]:bin_1_idx[1]], data[bin_2_idx[0]:bin_2_idx[1]] - ks_results[i:i + fps + 1] = stats.ks_2samp(data1=bin_1_data, data2=bin_2_data).statistic - t_test_results[i:i + fps + 1] = stats.ttest_ind(bin_1_data, bin_2_data).statistic + bin_1_data, bin_2_data = ( + data[bin_1_idx[0] : bin_1_idx[1]], + data[bin_2_idx[0] : bin_2_idx[1]], + ) + ks_results[i : i + fps + 1] = stats.ks_2samp( + data1=bin_1_data, data2=bin_2_data + ).statistic + t_test_results[i : i + fps + 1] = stats.ttest_ind( + bin_1_data, bin_2_data + ).statistic for i in range(0, data.shape[0] - fps, fps): - shapiro_results[i:i + fps + 1] = stats.shapiro(data[i:i + fps])[0] + shapiro_results[i : i + fps + 1] = stats.shapiro(data[i : i + fps])[0] rolling_idx = np.arange(fps)[None, :] + 1 * np.arange(data.shape[0])[:, None] for i in range(rolling_idx.shape[0]): @@ -352,17 +389,27 @@ def windowed_frequentist_distribution_tests(data: np.ndarray, peaks, _ = find_peaks(data[bin_start_idx:bin_end_idx], height=0) peak_cnt_results[i] = len(peaks) - columns = [f'{feature_name}_KS', f'{feature_name}_TTEST', - f'{feature_name}_SHAPIRO', f'{feature_name}_PEAK_CNT'] - - return pd.DataFrame( - np.column_stack((ks_results, t_test_results, shapiro_results, peak_cnt_results)), - columns=columns).round(4).fillna(0) + columns = [ + f"{feature_name}_KS", + f"{feature_name}_TTEST", + f"{feature_name}_SHAPIRO", + f"{feature_name}_PEAK_CNT", + ] + + return ( + pd.DataFrame( + np.column_stack( + (ks_results, t_test_results, shapiro_results, peak_cnt_results) + ), + columns=columns, + ) + .round(4) + .fillna(0) + ) @staticmethod @jit(nopython=True, cache=True) - def cdist(array_1: np.ndarray, - array_2: np.ndarray) -> np.ndarray: + def cdist(array_1: np.ndarray, array_2: np.ndarray) -> np.ndarray: """ Jitted analogue of meth:`scipy.cdist` for two 2D arrays. Use to calculate Euclidean distances between all coordinates in one array and all coordinates in a second array. E.g., computes the distances between @@ -410,7 +457,7 @@ def cdist_3d(data: np.ndarray) -> np.ndarray: return results @staticmethod - #@njit('(float32[:],)') + # @njit('(float32[:],)') def cosine_similarity(data: np.ndarray) -> np.ndarray: """ Jitted analogue of sklearn.metrics.pairwise import cosine_similarity. Similar to scipy.cdist. @@ -427,10 +474,8 @@ def cosine_similarity(data: np.ndarray) -> np.ndarray: similarity = dot_product / (norms * norms.T) return similarity - @staticmethod - def create_shifted_df(df: pd.DataFrame, - periods: int = 1) -> pd.DataFrame: + def create_shifted_df(df: pd.DataFrame, periods: int = 1) -> pd.DataFrame: """ Create dataframe including duplicated shifted (1) columns with ``_shifted`` suffix. @@ -447,8 +492,10 @@ def create_shifted_df(df: pd.DataFrame, """ data_df_shifted = df.shift(periods=periods) - data_df_shifted = data_df_shifted.combine_first(df).add_suffix('_shifted') - return pd.concat([df, data_df_shifted], axis=1, join='inner').reset_index(drop=True) + data_df_shifted = data_df_shifted.combine_first(df).add_suffix("_shifted") + return pd.concat([df, data_df_shifted], axis=1, join="inner").reset_index( + drop=True + ) def check_directionality_viable(self): """ @@ -464,16 +511,20 @@ def check_directionality_viable(self): direction_viable = True nose_cords, ear_left_cords, ear_right_cords = [], [], [] for animal_name in self.animal_bp_dict.keys(): - for bp_cord in ['X_bps', 'Y_bps']: + for bp_cord in ["X_bps", "Y_bps"]: bp_list = self.animal_bp_dict[animal_name][bp_cord] for bp_name in bp_list: - bp_name_components = bp_name.split('_') + bp_name_components = bp_name.split("_") bp_name_components = [x.lower() for x in bp_name_components] - if ('nose' in bp_name_components): + if "nose" in bp_name_components: nose_cords.append(bp_name) - elif ('ear' in bp_name_components) and ('left' in bp_name_components): + elif ("ear" in bp_name_components) and ( + "left" in bp_name_components + ): ear_left_cords.append(bp_name) - elif ('ear' in bp_name_components) and ('right' in bp_name_components): + elif ("ear" in bp_name_components) and ( + "right" in bp_name_components + ): ear_right_cords.append(bp_name) else: pass @@ -483,14 +534,22 @@ def check_directionality_viable(self): direction_viable = False if direction_viable: - nose_cords = [nose_cords[i * 2:(i + 1) * 2] for i in range((len(nose_cords) + 2 - 1) // 2)] - ear_left_cords = [ear_left_cords[i * 2:(i + 1) * 2] for i in range((len(ear_left_cords) + 2 - 1) // 2)] - ear_right_cords = [ear_right_cords[i * 2:(i + 1) * 2] for i in range((len(ear_right_cords) + 2 - 1) // 2)] + nose_cords = [ + nose_cords[i * 2 : (i + 1) * 2] + for i in range((len(nose_cords) + 2 - 1) // 2) + ] + ear_left_cords = [ + ear_left_cords[i * 2 : (i + 1) * 2] + for i in range((len(ear_left_cords) + 2 - 1) // 2) + ] + ear_right_cords = [ + ear_right_cords[i * 2 : (i + 1) * 2] + for i in range((len(ear_right_cords) + 2 - 1) // 2) + ] return direction_viable, nose_cords, ear_left_cords, ear_right_cords - def get_feature_extraction_headers(self, - pose: str) -> List[str]: + def get_feature_extraction_headers(self, pose: str) -> List[str]: """ Helper to return the headers names (body-part location columns) that should be used during feature extraction. @@ -498,17 +557,21 @@ def get_feature_extraction_headers(self, :return List[str]: The names and order of the pose-estimation columns. """ simba_dir = os.path.dirname(simba.__file__) - feature_categories_csv_path = os.path.join(simba_dir, Paths.SIMBA_FEATURE_EXTRACTION_COL_NAMES_PATH.value) + feature_categories_csv_path = os.path.join( + simba_dir, Paths.SIMBA_FEATURE_EXTRACTION_COL_NAMES_PATH.value + ) check_file_exist_and_readable(file_path=feature_categories_csv_path) bps = list(pd.read_csv(feature_categories_csv_path)[pose]) - return [x for x in bps if str(x) != 'nan'] + return [x for x in bps if str(x) != "nan"] @staticmethod @jit(nopython=True) - def jitted_line_crosses_to_nonstatic_targets(left_ear_array: np.ndarray, - right_ear_array: np.ndarray, - nose_array: np.ndarray, - target_array: np.ndarray) -> np.ndarray: + def jitted_line_crosses_to_nonstatic_targets( + left_ear_array: np.ndarray, + right_ear_array: np.ndarray, + nose_array: np.ndarray, + target_array: np.ndarray, + ) -> np.ndarray: """ Jitted helper to calculate if an animal is directing towards another animals body-part coordinate, given the target body-part and the left ear, right ear, and nose coordinates of the observer. @@ -543,10 +606,20 @@ def jitted_line_crosses_to_nonstatic_targets(left_ear_array: np.ndarray, Ph = np.sqrt(Px * Px + Py * Py) Qh = np.sqrt(Qx * Qx + Qy * Qy) Nh = np.sqrt(Nx * Nx + Ny * Ny) - if (Nh < Ph and Nh < Qh and Qh < Ph): - results_array[frame_no] = [0, right_ear_array[frame_no][0], right_ear_array[frame_no][1], True] - elif (Nh < Ph and Nh < Qh and Ph < Qh): - results_array[frame_no] = [1, left_ear_array[frame_no][0], left_ear_array[frame_no][1], True] + if Nh < Ph and Nh < Qh and Qh < Ph: + results_array[frame_no] = [ + 0, + right_ear_array[frame_no][0], + right_ear_array[frame_no][1], + True, + ] + elif Nh < Ph and Nh < Qh and Ph < Qh: + results_array[frame_no] = [ + 1, + left_ear_array[frame_no][0], + left_ear_array[frame_no][1], + True, + ] else: results_array[frame_no] = [2, -1, -1, False] @@ -554,10 +627,12 @@ def jitted_line_crosses_to_nonstatic_targets(left_ear_array: np.ndarray, @staticmethod @jit(nopython=True) - def jitted_line_crosses_to_static_targets(left_ear_array: np.ndarray, - right_ear_array: np.ndarray, - nose_array: np.ndarray, - target_array: np.ndarray) -> np.ndarray: + def jitted_line_crosses_to_static_targets( + left_ear_array: np.ndarray, + right_ear_array: np.ndarray, + nose_array: np.ndarray, + target_array: np.ndarray, + ) -> np.ndarray: """ Jitted helper to calculate if an animal is directing towards a static location (ROI centroid), given the target location and the left ear, right ear, and nose coordinates of the observer. @@ -594,10 +669,20 @@ def jitted_line_crosses_to_static_targets(left_ear_array: np.ndarray, Ph = np.sqrt(Px * Px + Py * Py) Qh = np.sqrt(Qx * Qx + Qy * Qy) Nh = np.sqrt(Nx * Nx + Ny * Ny) - if (Nh < Ph and Nh < Qh and Qh < Ph): - results_array[frame_no] = [0, right_ear_array[frame_no][0], right_ear_array[frame_no][1], True] - elif (Nh < Ph and Nh < Qh and Ph < Qh): - results_array[frame_no] = [1, left_ear_array[frame_no][0], left_ear_array[frame_no][1], True] + if Nh < Ph and Nh < Qh and Qh < Ph: + results_array[frame_no] = [ + 0, + right_ear_array[frame_no][0], + right_ear_array[frame_no][1], + True, + ] + elif Nh < Ph and Nh < Qh and Ph < Qh: + results_array[frame_no] = [ + 1, + left_ear_array[frame_no][0], + left_ear_array[frame_no][1], + True, + ] else: results_array[frame_no] = [2, -1, -1, False] @@ -605,7 +690,6 @@ def jitted_line_crosses_to_static_targets(left_ear_array: np.ndarray, @staticmethod def minimum_bounding_rectangle(points: np.ndarray) -> np.ndarray: - """ Finds the minimum bounding rectangle from convex hull vertices. @@ -629,17 +713,23 @@ def minimum_bounding_rectangle(points: np.ndarray) -> np.ndarray: >>> [[10.7260274 , 3.39726027], [ 1.4109589 , -0.09589041], [-0.31506849, 4.50684932], [ 9., 8. ]] """ - pi2 = np.pi / 2. + pi2 = np.pi / 2.0 hull_points = points[ConvexHull(points).vertices] edges = hull_points[1:] - hull_points[:-1] angles = np.arctan2(edges[:, 1], edges[:, 0]) angles = np.abs(np.mod(angles, pi2)) angles = np.unique(angles) - rotations = np.vstack([np.cos(angles), np.cos(angles - pi2), np.cos(angles + pi2), np.cos(angles)]).T + rotations = np.vstack( + [np.cos(angles), np.cos(angles - pi2), np.cos(angles + pi2), np.cos(angles)] + ).T rotations = rotations.reshape((-1, 2, 2)) rot_points = np.dot(rotations, hull_points.T) - min_x, max_x = np.nanmin(rot_points[:, 0], axis=1), np.nanmax(rot_points[:, 0], axis=1) - min_y, max_y = np.nanmin(rot_points[:, 1], axis=1), np.nanmax(rot_points[:, 1], axis=1) + min_x, max_x = np.nanmin(rot_points[:, 0], axis=1), np.nanmax( + rot_points[:, 0], axis=1 + ) + min_y, max_y = np.nanmin(rot_points[:, 1], axis=1), np.nanmax( + rot_points[:, 1], axis=1 + ) areas = (max_x - min_x) * (max_y - min_y) best_idx = np.argmin(areas) x1, x2 = max_x[best_idx], min_x[best_idx] @@ -650,13 +740,14 @@ def minimum_bounding_rectangle(points: np.ndarray) -> np.ndarray: rval[2], rval[3] = np.dot([x2, y1], r), np.dot([x1, y1], r) return rval - @staticmethod @jit(nopython=True) - def framewise_euclidean_distance(location_1: np.ndarray, - location_2: np.ndarray, - px_per_mm: float, - centimeter: bool = False) -> np.ndarray: + def framewise_euclidean_distance( + location_1: np.ndarray, + location_2: np.ndarray, + px_per_mm: float, + centimeter: bool = False, + ) -> np.ndarray: """ Jitted helper finding frame-wise distances between two moving locations in millimeter or centimeter. @@ -703,26 +794,28 @@ def framewise_euclidean_distance(location_1: np.ndarray, # # return results.astype(int) - - def change_in_bodypart_euclidean_distance(self, - location_1: np.ndarray, - location_2: np.ndarray, - fps: int, - px_per_mm: float, - time_windows: np.ndarray = np.array([0.2, 0.4, 0.8, 1.6])) -> np.ndarray: + def change_in_bodypart_euclidean_distance( + self, + location_1: np.ndarray, + location_2: np.ndarray, + fps: int, + px_per_mm: float, + time_windows: np.ndarray = np.array([0.2, 0.4, 0.8, 1.6]), + ) -> np.ndarray: """ Computes the difference between the distances of two body-parts in the current frame versus N.N seconds ago. Used for computing if animal body-parts are traveling away or towards each other within defined time-windows. """ - distances = self.framewise_euclidean_distance(location_1=location_1, location_2=location_2, px_per_mm=px_per_mm) - return self._relative_distances(distances=distances, fps=fps, time_windows=time_windows) + distances = self.framewise_euclidean_distance( + location_1=location_1, location_2=location_2, px_per_mm=px_per_mm + ) + return self._relative_distances( + distances=distances, fps=fps, time_windows=time_windows + ) - - - def dataframe_gaussian_smoother(self, - df: pd.DataFrame, - fps: int, - time_window: int=100) -> pd.DataFrame: + def dataframe_gaussian_smoother( + self, df: pd.DataFrame, fps: int, time_window: int = 100 + ) -> pd.DataFrame: """ Column-wise Gaussian smoothing of dataframe. @@ -737,14 +830,21 @@ def dataframe_gaussian_smoother(self, frames_in_time_window = int(time_window / (1000 / fps)) for c in df.columns: - df[c] = df[c].rolling(window=int(frames_in_time_window), win_type='gaussian', center=True).mean(std=5).fillna(df[c]).abs().astype(int) + df[c] = ( + df[c] + .rolling( + window=int(frames_in_time_window), win_type="gaussian", center=True + ) + .mean(std=5) + .fillna(df[c]) + .abs() + .astype(int) + ) return df - - def dataframe_savgol_smoother(self, - df: pd.DataFrame, - fps: int, - time_window: int = 150) -> pd.DataFrame: + def dataframe_savgol_smoother( + self, df: pd.DataFrame, fps: int, time_window: int = 150 + ) -> pd.DataFrame: """ Column-wise Savitzky-Golay smoothing of dataframe. @@ -758,10 +858,17 @@ def dataframe_savgol_smoother(self, """ frames_in_time_window = int(time_window / (1000 / fps)) - if (frames_in_time_window % 2) == 0: frames_in_time_window = frames_in_time_window - 1 - if (frames_in_time_window % 2) <= 3: frames_in_time_window = 5 + if (frames_in_time_window % 2) == 0: + frames_in_time_window = frames_in_time_window - 1 + if (frames_in_time_window % 2) <= 3: + frames_in_time_window = 5 for c in df.columns: - df[c] = savgol_filter(x=df[c].to_numpy(), window_length=frames_in_time_window, polyorder=3, mode='nearest').astype(int) + df[c] = savgol_filter( + x=df[c].to_numpy(), + window_length=frames_in_time_window, + polyorder=3, + mode="nearest", + ).astype(int) return df def get_bp_headers(self) -> None: @@ -770,11 +877,10 @@ def get_bp_headers(self) -> None: """ self.col_headers = [] for bp in self.body_parts_lst: - c1, c2, c3 = (f'{bp}_x', f'{bp}_y', f'{bp}_p') + c1, c2, c3 = (f"{bp}_x", f"{bp}_y", f"{bp}_p") self.col_headers.extend((c1, c2, c3)) def check_directionality_cords(self) -> dict: - """ Helper to check if ear and nose body-parts are present within the pose-estimation data. @@ -785,47 +891,66 @@ def check_directionality_cords(self) -> dict: results = {} for animal in self.animal_bp_dict.keys(): results[animal] = {} - results[animal]['Nose'] = {} - results[animal]['Ear_left'] = {} - results[animal]['Ear_right'] = {} - for dimension in ['X_bps', 'Y_bps']: + results[animal]["Nose"] = {} + results[animal]["Ear_left"] = {} + results[animal]["Ear_right"] = {} + for dimension in ["X_bps", "Y_bps"]: for cord in self.animal_bp_dict[animal][dimension]: if ("nose" in cord.lower()) and ("x" in cord.lower()): - results[animal]['Nose']['X_bps'] = cord + results[animal]["Nose"]["X_bps"] = cord elif ("nose" in cord.lower()) and ("y" in cord.lower()): - results[animal]['Nose']['Y_bps'] = cord - elif ("left" in cord.lower()) and ("x" in cord.lower()) and ("ear" in cord.lower()): - results[animal]['Ear_left']['X_bps'] = cord - elif ("left" in cord.lower()) and ("Y".lower() in cord.lower()) and ("ear".lower() in cord.lower()): - results[animal]['Ear_left']['Y_bps'] = cord - elif ("right" in cord.lower()) and ("x" in cord.lower()) and ("ear" in cord.lower()): - results[animal]['Ear_right']['X_bps'] = cord - elif ("right" in cord.lower()) and ("y" in cord.lower()) and ("ear".lower() in cord.lower()): - results[animal]['Ear_right']['Y_bps'] = cord + results[animal]["Nose"]["Y_bps"] = cord + elif ( + ("left" in cord.lower()) + and ("x" in cord.lower()) + and ("ear" in cord.lower()) + ): + results[animal]["Ear_left"]["X_bps"] = cord + elif ( + ("left" in cord.lower()) + and ("Y".lower() in cord.lower()) + and ("ear".lower() in cord.lower()) + ): + results[animal]["Ear_left"]["Y_bps"] = cord + elif ( + ("right" in cord.lower()) + and ("x" in cord.lower()) + and ("ear" in cord.lower()) + ): + results[animal]["Ear_right"]["X_bps"] = cord + elif ( + ("right" in cord.lower()) + and ("y" in cord.lower()) + and ("ear".lower() in cord.lower()) + ): + results[animal]["Ear_right"]["Y_bps"] = cord return results - def insert_default_headers_for_feature_extraction(self, - df: pd.DataFrame, - headers: List[str], - pose_config: str, - filename: str) -> pd.DataFrame: + def insert_default_headers_for_feature_extraction( + self, df: pd.DataFrame, headers: List[str], pose_config: str, filename: str + ) -> pd.DataFrame: """ Helper to insert correct body-part column names prior to defualt feature extraction methods. """ if len(headers) != len(df.columns): - raise CountError(f'Your SimBA project is set to using the default {pose_config} pose-configuration. ' - f'SimBA therefore expects {str(len(headers))} columns of data inside the files within the project_folder. However, ' - f'within file {filename} file, SimBA found {str(len(df.columns))} columns.', source=self.__class__.__name__) + raise CountError( + f"Your SimBA project is set to using the default {pose_config} pose-configuration. " + f"SimBA therefore expects {str(len(headers))} columns of data inside the files within the project_folder. However, " + f"within file {filename} file, SimBA found {str(len(df.columns))} columns.", + source=self.__class__.__name__, + ) else: df.columns = headers return df @staticmethod - def line_crosses_to_static_targets(p: List[float], - q: List[float], - n: List[float], - M: List[float], - coord: List[float]) -> (bool, List[float]): + def line_crosses_to_static_targets( + p: List[float], + q: List[float], + n: List[float], + M: List[float], + coord: List[float], + ) -> (bool, List[float]): """ Legacy non-jitted helper to calculate if an animal is directing towards a static coordinate (e.g., ROI centroid). @@ -851,19 +976,17 @@ def line_crosses_to_static_targets(p: List[float], Ph = np.sqrt(Px * Px + Py * Py) Qh = np.sqrt(Qx * Qx + Qy * Qy) Nh = np.sqrt(Nx * Nx + Ny * Ny) - if (Nh < Ph and Nh < Qh and Qh < Ph): + if Nh < Ph and Nh < Qh and Qh < Ph: coord.extend((q[0], q[1])) return True, coord - elif (Nh < Ph and Nh < Qh and Ph < Qh): + elif Nh < Ph and Nh < Qh and Ph < Qh: coord.extend((p[0], p[1])) return True, coord else: return False, coord - - -#cdist_3d +# cdist_3d # # array_1 = np.array([[[100, 200], [301, 198], [234, 512]], @@ -878,4 +1001,4 @@ def line_crosses_to_static_targets(p: List[float], # # points = np.random.randint(1, 10, size=(10, 2)) -# FeatureExtractionMixin.minimum_bounding_rectangle(points=points) \ No newline at end of file +# FeatureExtractionMixin.minimum_bounding_rectangle(points=points)