diff --git a/simba/mixins/feature_extraction_mixin.py b/simba/mixins/feature_extraction_mixin.py index bd4f73e71..950e872b4 100644 --- a/simba/mixins/feature_extraction_mixin.py +++ b/simba/mixins/feature_extraction_mixin.py @@ -1,31 +1,28 @@ __author__ = "Simon Nilsson" import warnings - warnings.filterwarnings("ignore") -import glob -import math -import os -from typing import List, Optional - import numpy as np -import pandas as pd -from numba import jit, prange -from scipy import stats -from scipy.signal import find_peaks, savgol_filter +from numba import jit, njit, prange from scipy.spatial import ConvexHull from scipy.spatial.qhull import QhullError +from scipy.signal import savgol_filter +import math +import os, glob +from scipy import stats +import pandas as pd +from scipy.signal import find_peaks +from typing import Optional, List -import simba -from simba.utils.checks import (check_file_exist_and_readable, - check_if_filepath_list_is_empty, - check_minimum_roll_windows) -from simba.utils.enums import Options, Paths -from simba.utils.errors import CountError, InvalidInputError -from simba.utils.read_write import (get_bp_headers, read_config_file, - read_project_path_and_file_type, - read_video_info_csv) +from simba.utils.enums import Paths, Options +from simba.utils.checks import check_if_filepath_list_is_empty, check_file_exist_and_readable, check_minimum_roll_windows +from simba.utils.read_write import (read_project_path_and_file_type, + read_video_info_csv, + read_config_file, + get_bp_headers) +from simba.utils.errors import CountError, InvalidInputError +import simba class FeatureExtractionMixin(object): """ @@ -34,48 +31,34 @@ class FeatureExtractionMixin(object): :param Optional[configparser.Configparser] config_path: path to SimBA project_config.ini """ - def __init__(self, config_path: Optional[str] = None): + def __init__(self, + config_path: Optional[str] = None): + if config_path: self.config_path = config_path self.config = read_config_file(config_path=config_path) - self.project_path, self.file_type = read_project_path_and_file_type( - config=self.config - ) - self.video_info_path = os.path.join( - self.project_path, Paths.VIDEO_INFO.value - ) + self.project_path, self.file_type = read_project_path_and_file_type(config=self.config) + self.video_info_path = os.path.join(self.project_path, Paths.VIDEO_INFO.value) self.video_info_df = read_video_info_csv(file_path=self.video_info_path) - self.data_in_dir = os.path.join( - self.project_path, Paths.OUTLIER_CORRECTED.value - ) - self.save_dir = os.path.join( - self.project_path, Paths.FEATURES_EXTRACTED_DIR.value - ) - if not os.path.exists(self.save_dir): - os.makedirs(self.save_dir) + self.data_in_dir = os.path.join(self.project_path, Paths.OUTLIER_CORRECTED.value) + self.save_dir = os.path.join(self.project_path, Paths.FEATURES_EXTRACTED_DIR.value) + if not os.path.exists(self.save_dir): os.makedirs(self.save_dir) bp_path = os.path.join(self.project_path, Paths.BP_NAMES.value) check_file_exist_and_readable(file_path=bp_path) self.body_parts_lst = list(pd.read_csv(bp_path, header=None)[0]) - self.roll_windows_values = check_minimum_roll_windows( - Options.ROLLING_WINDOW_DIVISORS.value, self.video_info_df["fps"].min() - ) - self.files_found = glob.glob(self.data_in_dir + "/*." + self.file_type) - check_if_filepath_list_is_empty( - filepaths=self.files_found, - error_msg=f"No files of type {self.file_type} found in {self.data_in_dir}", - ) + self.roll_windows_values = check_minimum_roll_windows(Options.ROLLING_WINDOW_DIVISORS.value, self.video_info_df['fps'].min()) + self.files_found = glob.glob(self.data_in_dir + '/*.' + self.file_type) + check_if_filepath_list_is_empty(filepaths=self.files_found, error_msg=f'No files of type {self.file_type} found in {self.data_in_dir}') self.col_headers = get_bp_headers(body_parts_lst=self.body_parts_lst) - self.col_headers_shifted = [bp + "_shifted" for bp in self.col_headers] + self.col_headers_shifted = [bp + '_shifted' for bp in self.col_headers] @staticmethod @jit(nopython=True) - def euclidean_distance( - bp_1_x: np.ndarray, - bp_2_x: np.ndarray, - bp_1_y: np.ndarray, - bp_2_y: np.ndarray, - px_per_mm: float, - ) -> np.ndarray: + def euclidean_distance(bp_1_x: np.ndarray, + bp_2_x: np.ndarray, + bp_1_y: np.ndarray, + bp_2_y: np.ndarray, + px_per_mm: float) -> np.ndarray: """ Helper to compute the Euclidean distance in millimeters between two body-parts in all frames of a video @@ -97,11 +80,15 @@ def euclidean_distance( series = (np.sqrt((bp_1_x - bp_2_x) ** 2 + (bp_1_y - bp_2_y) ** 2)) / px_per_mm return series + @staticmethod @jit(nopython=True, fastmath=True, cache=True) - def angle3pt( - ax: float, ay: float, bx: float, by: float, cx: float, cy: float - ) -> float: + def angle3pt(ax: float, + ay: float, + bx: float, + by: float, + cx: float, + cy: float) -> float: """ Jitted helper for single frame 3-point angle. @@ -142,9 +129,7 @@ def angle3pt_serialized(data: np.ndarray) -> np.ndarray: results = np.full((data.shape[0]), 0.0) for i in prange(data.shape[0]): angle = math.degrees( - math.atan2(data[i][5] - data[i][3], data[i][4] - data[i][2]) - - math.atan2(data[i][1] - data[i][3], data[i][0] - data[i][2]) - ) + math.atan2(data[i][5] - data[i][3], data[i][4] - data[i][2]) - math.atan2(data[i][1] - data[i][3], data[i][0] - data[i][2])) if angle < 0: angle += 360 results[i] = angle @@ -210,19 +195,15 @@ def count_values_in_range(data: np.ndarray, ranges: np.ndarray) -> np.ndarray: for i in prange(data.shape[0]): for j in prange(ranges.shape[0]): lower_bound, upper_bound = ranges[j][0], ranges[j][1] - results[i][j] = data[i][ - np.logical_and(data[i] >= lower_bound, data[i] <= upper_bound) - ].shape[0] + results[i][j] = data[i][np.logical_and(data[i] >= lower_bound, data[i] <= upper_bound)].shape[0] return results @staticmethod @jit(nopython=True) - def framewise_euclidean_distance_roi( - location_1: np.ndarray, - location_2: np.ndarray, - px_per_mm: float, - centimeter: bool = False, - ) -> np.ndarray: + def framewise_euclidean_distance_roi(location_1: np.ndarray, + location_2: np.ndarray, + px_per_mm: float, + centimeter: bool = False) -> np.ndarray: """ Find frame-wise distances between a moving location (location_1) and static location (location_2) in millimeter or centimeter. @@ -256,9 +237,8 @@ def framewise_euclidean_distance_roi( @staticmethod @jit(nopython=True) - def framewise_inside_rectangle_roi( - bp_location: np.ndarray, roi_coords: np.ndarray - ) -> np.ndarray: + def framewise_inside_rectangle_roi(bp_location: np.ndarray, + roi_coords: np.ndarray) -> np.ndarray: """ Jitted helper for frame-wise analysis if animal is inside static rectangular ROI. @@ -277,14 +257,8 @@ def framewise_inside_rectangle_roi( >>> [0, 0, 0, 0, 0, 0] """ results = np.full((bp_location.shape[0]), 0) - within_x_idx = np.argwhere( - (bp_location[:, 0] <= roi_coords[1][0]) - & (bp_location[:, 0] >= roi_coords[0][0]) - ).flatten() - within_y_idx = np.argwhere( - (bp_location[:, 1] <= roi_coords[1][1]) - & (bp_location[:, 1] >= roi_coords[0][1]) - ).flatten() + within_x_idx = np.argwhere((bp_location[:, 0] <= roi_coords[1][0]) & (bp_location[:, 0] >= roi_coords[0][0])).flatten() + within_y_idx = np.argwhere((bp_location[:, 1] <= roi_coords[1][1]) & (bp_location[:, 1] >= roi_coords[0][1])).flatten() for i in prange(within_x_idx.shape[0]): match = np.argwhere(within_y_idx == within_x_idx[i]) if match.shape[0] > 0: @@ -293,9 +267,9 @@ def framewise_inside_rectangle_roi( @staticmethod @jit(nopython=True) - def framewise_inside_polygon_roi( - bp_location: np.ndarray, roi_coords: np.ndarray - ) -> np.ndarray: + def framewise_inside_polygon_roi(bp_location: np.ndarray, + roi_coords: np.ndarray) -> np.ndarray: + """ Jitted helper for frame-wise detection if animal is inside static polygon ROI. @@ -326,11 +300,7 @@ def framewise_inside_polygon_roi( p1x, p1y = roi_coords[0] for j in prange(n + 1): p2x, p2y = roi_coords[j % n] - if ( - (y > min(p1y, p2y)) - and (y <= max(p1y, p2y)) - and (x <= max(p1x, p2x)) - ): + if (y > min(p1y, p2y)) and (y <= max(p1y, p2y)) and (x <= max(p1x, p2x)): if p1y != p2y: xints = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x if p1x == p2x or x <= xints: @@ -342,9 +312,9 @@ def framewise_inside_polygon_roi( return results @staticmethod - def windowed_frequentist_distribution_tests( - data: np.ndarray, feature_name: str, fps: int - ) -> pd.DataFrame: + def windowed_frequentist_distribution_tests(data: np.ndarray, + feature_name: str, + fps: int) -> pd.DataFrame: """ Calculates feature value distributions and feature peak counts in 1-s sequential time-bins. @@ -362,26 +332,19 @@ def windowed_frequentist_distribution_tests( >>> FeatureExtractionMixin.windowed_frequentist_distribution_tests(data=feature_data, fps=25, feature_name='Anima_1_velocity') """ - (ks_results,) = (np.full((data.shape[0]), -1.0),) + ks_results, = np.full((data.shape[0]), -1.0), t_test_results = np.full((data.shape[0]), -1.0) shapiro_results = np.full((data.shape[0]), -1.0) peak_cnt_results = np.full((data.shape[0]), -1.0) for i in range(fps, data.shape[0] - fps, fps): bin_1_idx, bin_2_idx = [i - fps, i], [i, i + fps] - bin_1_data, bin_2_data = ( - data[bin_1_idx[0] : bin_1_idx[1]], - data[bin_2_idx[0] : bin_2_idx[1]], - ) - ks_results[i : i + fps + 1] = stats.ks_2samp( - data1=bin_1_data, data2=bin_2_data - ).statistic - t_test_results[i : i + fps + 1] = stats.ttest_ind( - bin_1_data, bin_2_data - ).statistic + bin_1_data, bin_2_data = data[bin_1_idx[0]:bin_1_idx[1]], data[bin_2_idx[0]:bin_2_idx[1]] + ks_results[i:i + fps + 1] = stats.ks_2samp(data1=bin_1_data, data2=bin_2_data).statistic + t_test_results[i:i + fps + 1] = stats.ttest_ind(bin_1_data, bin_2_data).statistic for i in range(0, data.shape[0] - fps, fps): - shapiro_results[i : i + fps + 1] = stats.shapiro(data[i : i + fps])[0] + shapiro_results[i:i + fps + 1] = stats.shapiro(data[i:i + fps])[0] rolling_idx = np.arange(fps)[None, :] + 1 * np.arange(data.shape[0])[:, None] for i in range(rolling_idx.shape[0]): @@ -389,29 +352,25 @@ def windowed_frequentist_distribution_tests( peaks, _ = find_peaks(data[bin_start_idx:bin_end_idx], height=0) peak_cnt_results[i] = len(peaks) - columns = [ - f"{feature_name}_KS", - f"{feature_name}_TTEST", - f"{feature_name}_SHAPIRO", - f"{feature_name}_PEAK_CNT", - ] - - return ( - pd.DataFrame( - np.column_stack( - (ks_results, t_test_results, shapiro_results, peak_cnt_results) - ), - columns=columns, - ) - .round(4) - .fillna(0) - ) + columns = [f'{feature_name}_KS', f'{feature_name}_TTEST', + f'{feature_name}_SHAPIRO', f'{feature_name}_PEAK_CNT'] + + return pd.DataFrame( + np.column_stack((ks_results, t_test_results, shapiro_results, peak_cnt_results)), + columns=columns).round(4).fillna(0) @staticmethod @jit(nopython=True, cache=True) - def cdist(array_1: np.ndarray, array_2: np.ndarray) -> np.ndarray: + def cdist(array_1: np.ndarray, + array_2: np.ndarray) -> np.ndarray: """ - Jitted analogue of meth:`scipy.cdist` for two 2D arrays. + Jitted analogue of meth:`scipy.cdist` for two 2D arrays. Use to calculate Euclidean distances between + all coordinates in one array and all coordinates in a second array. E.g., computes the distances between + all body-parts of one animal and all body-parts of a second animal. + + .. image:: _static/img/cdist.png + :width: 600 + :align: center :parameter np.ndarray array_1: 2D array of body-part coordinates :parameter np.ndarray array_2: 2D array of body-part coordinates @@ -435,7 +394,8 @@ def cdist(array_1: np.ndarray, array_2: np.ndarray) -> np.ndarray: @jit(nopython=True) def cdist_3d(data: np.ndarray) -> np.ndarray: """ - Jitted analogue of meth:`scipy.cdist` for 3D array. + Jitted analogue of meth:`scipy.cdist` for 3D array. Use to calculate Euclidean distances between + all coordinates in of one array and itself. :parameter np.ndarray data: 3D array of body-part coordinates of size len(frames) x -1 x 2. :return np.ndarray: 3D array of size data.shape[0], data.shape[1], data.shape[1]. @@ -450,7 +410,27 @@ def cdist_3d(data: np.ndarray) -> np.ndarray: return results @staticmethod - def create_shifted_df(df: pd.DataFrame, periods: int = 1) -> pd.DataFrame: + #@njit('(float32[:],)') + def cosine_similarity(data: np.ndarray) -> np.ndarray: + """ + Jitted analogue of sklearn.metrics.pairwise import cosine_similarity. Similar to scipy.cdist. + calculates the cosine similarity between all pairs in 2D array. + + :example: + >>> data = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]).astype(np.float32) + >>> FeatureExtractionMixin().cosine_similarity(data=data) + >>> [[1.0, 0.974, 0.959][0.974, 1.0, 0.998] [0.959, 0.998, 1.0] + """ + + dot_product = np.dot(data, data.T) + norms = np.linalg.norm(data, axis=1).reshape(-1, 1) + similarity = dot_product / (norms * norms.T) + return similarity + + + @staticmethod + def create_shifted_df(df: pd.DataFrame, + periods: int = 1) -> pd.DataFrame: """ Create dataframe including duplicated shifted (1) columns with ``_shifted`` suffix. @@ -467,10 +447,8 @@ def create_shifted_df(df: pd.DataFrame, periods: int = 1) -> pd.DataFrame: """ data_df_shifted = df.shift(periods=periods) - data_df_shifted = data_df_shifted.combine_first(df).add_suffix("_shifted") - return pd.concat([df, data_df_shifted], axis=1, join="inner").reset_index( - drop=True - ) + data_df_shifted = data_df_shifted.combine_first(df).add_suffix('_shifted') + return pd.concat([df, data_df_shifted], axis=1, join='inner').reset_index(drop=True) def check_directionality_viable(self): """ @@ -486,20 +464,16 @@ def check_directionality_viable(self): direction_viable = True nose_cords, ear_left_cords, ear_right_cords = [], [], [] for animal_name in self.animal_bp_dict.keys(): - for bp_cord in ["X_bps", "Y_bps"]: + for bp_cord in ['X_bps', 'Y_bps']: bp_list = self.animal_bp_dict[animal_name][bp_cord] for bp_name in bp_list: - bp_name_components = bp_name.split("_") + bp_name_components = bp_name.split('_') bp_name_components = [x.lower() for x in bp_name_components] - if "nose" in bp_name_components: + if ('nose' in bp_name_components): nose_cords.append(bp_name) - elif ("ear" in bp_name_components) and ( - "left" in bp_name_components - ): + elif ('ear' in bp_name_components) and ('left' in bp_name_components): ear_left_cords.append(bp_name) - elif ("ear" in bp_name_components) and ( - "right" in bp_name_components - ): + elif ('ear' in bp_name_components) and ('right' in bp_name_components): ear_right_cords.append(bp_name) else: pass @@ -509,22 +483,14 @@ def check_directionality_viable(self): direction_viable = False if direction_viable: - nose_cords = [ - nose_cords[i * 2 : (i + 1) * 2] - for i in range((len(nose_cords) + 2 - 1) // 2) - ] - ear_left_cords = [ - ear_left_cords[i * 2 : (i + 1) * 2] - for i in range((len(ear_left_cords) + 2 - 1) // 2) - ] - ear_right_cords = [ - ear_right_cords[i * 2 : (i + 1) * 2] - for i in range((len(ear_right_cords) + 2 - 1) // 2) - ] + nose_cords = [nose_cords[i * 2:(i + 1) * 2] for i in range((len(nose_cords) + 2 - 1) // 2)] + ear_left_cords = [ear_left_cords[i * 2:(i + 1) * 2] for i in range((len(ear_left_cords) + 2 - 1) // 2)] + ear_right_cords = [ear_right_cords[i * 2:(i + 1) * 2] for i in range((len(ear_right_cords) + 2 - 1) // 2)] return direction_viable, nose_cords, ear_left_cords, ear_right_cords - def get_feature_extraction_headers(self, pose: str) -> List[str]: + def get_feature_extraction_headers(self, + pose: str) -> List[str]: """ Helper to return the headers names (body-part location columns) that should be used during feature extraction. @@ -532,21 +498,17 @@ def get_feature_extraction_headers(self, pose: str) -> List[str]: :return List[str]: The names and order of the pose-estimation columns. """ simba_dir = os.path.dirname(simba.__file__) - feature_categories_csv_path = os.path.join( - simba_dir, Paths.SIMBA_FEATURE_EXTRACTION_COL_NAMES_PATH.value - ) + feature_categories_csv_path = os.path.join(simba_dir, Paths.SIMBA_FEATURE_EXTRACTION_COL_NAMES_PATH.value) check_file_exist_and_readable(file_path=feature_categories_csv_path) bps = list(pd.read_csv(feature_categories_csv_path)[pose]) - return [x for x in bps if str(x) != "nan"] + return [x for x in bps if str(x) != 'nan'] @staticmethod @jit(nopython=True) - def jitted_line_crosses_to_nonstatic_targets( - left_ear_array: np.ndarray, - right_ear_array: np.ndarray, - nose_array: np.ndarray, - target_array: np.ndarray, - ) -> np.ndarray: + def jitted_line_crosses_to_nonstatic_targets(left_ear_array: np.ndarray, + right_ear_array: np.ndarray, + nose_array: np.ndarray, + target_array: np.ndarray) -> np.ndarray: """ Jitted helper to calculate if an animal is directing towards another animals body-part coordinate, given the target body-part and the left ear, right ear, and nose coordinates of the observer. @@ -581,20 +543,10 @@ def jitted_line_crosses_to_nonstatic_targets( Ph = np.sqrt(Px * Px + Py * Py) Qh = np.sqrt(Qx * Qx + Qy * Qy) Nh = np.sqrt(Nx * Nx + Ny * Ny) - if Nh < Ph and Nh < Qh and Qh < Ph: - results_array[frame_no] = [ - 0, - right_ear_array[frame_no][0], - right_ear_array[frame_no][1], - True, - ] - elif Nh < Ph and Nh < Qh and Ph < Qh: - results_array[frame_no] = [ - 1, - left_ear_array[frame_no][0], - left_ear_array[frame_no][1], - True, - ] + if (Nh < Ph and Nh < Qh and Qh < Ph): + results_array[frame_no] = [0, right_ear_array[frame_no][0], right_ear_array[frame_no][1], True] + elif (Nh < Ph and Nh < Qh and Ph < Qh): + results_array[frame_no] = [1, left_ear_array[frame_no][0], left_ear_array[frame_no][1], True] else: results_array[frame_no] = [2, -1, -1, False] @@ -602,12 +554,10 @@ def jitted_line_crosses_to_nonstatic_targets( @staticmethod @jit(nopython=True) - def jitted_line_crosses_to_static_targets( - left_ear_array: np.ndarray, - right_ear_array: np.ndarray, - nose_array: np.ndarray, - target_array: np.ndarray, - ) -> np.ndarray: + def jitted_line_crosses_to_static_targets(left_ear_array: np.ndarray, + right_ear_array: np.ndarray, + nose_array: np.ndarray, + target_array: np.ndarray) -> np.ndarray: """ Jitted helper to calculate if an animal is directing towards a static location (ROI centroid), given the target location and the left ear, right ear, and nose coordinates of the observer. @@ -644,20 +594,10 @@ def jitted_line_crosses_to_static_targets( Ph = np.sqrt(Px * Px + Py * Py) Qh = np.sqrt(Qx * Qx + Qy * Qy) Nh = np.sqrt(Nx * Nx + Ny * Ny) - if Nh < Ph and Nh < Qh and Qh < Ph: - results_array[frame_no] = [ - 0, - right_ear_array[frame_no][0], - right_ear_array[frame_no][1], - True, - ] - elif Nh < Ph and Nh < Qh and Ph < Qh: - results_array[frame_no] = [ - 1, - left_ear_array[frame_no][0], - left_ear_array[frame_no][1], - True, - ] + if (Nh < Ph and Nh < Qh and Qh < Ph): + results_array[frame_no] = [0, right_ear_array[frame_no][0], right_ear_array[frame_no][1], True] + elif (Nh < Ph and Nh < Qh and Ph < Qh): + results_array[frame_no] = [1, left_ear_array[frame_no][0], left_ear_array[frame_no][1], True] else: results_array[frame_no] = [2, -1, -1, False] @@ -665,6 +605,7 @@ def jitted_line_crosses_to_static_targets( @staticmethod def minimum_bounding_rectangle(points: np.ndarray) -> np.ndarray: + """ Finds the minimum bounding rectangle from convex hull vertices. @@ -688,23 +629,17 @@ def minimum_bounding_rectangle(points: np.ndarray) -> np.ndarray: >>> [[10.7260274 , 3.39726027], [ 1.4109589 , -0.09589041], [-0.31506849, 4.50684932], [ 9., 8. ]] """ - pi2 = np.pi / 2.0 + pi2 = np.pi / 2. hull_points = points[ConvexHull(points).vertices] edges = hull_points[1:] - hull_points[:-1] angles = np.arctan2(edges[:, 1], edges[:, 0]) angles = np.abs(np.mod(angles, pi2)) angles = np.unique(angles) - rotations = np.vstack( - [np.cos(angles), np.cos(angles - pi2), np.cos(angles + pi2), np.cos(angles)] - ).T + rotations = np.vstack([np.cos(angles), np.cos(angles - pi2), np.cos(angles + pi2), np.cos(angles)]).T rotations = rotations.reshape((-1, 2, 2)) rot_points = np.dot(rotations, hull_points.T) - min_x, max_x = np.nanmin(rot_points[:, 0], axis=1), np.nanmax( - rot_points[:, 0], axis=1 - ) - min_y, max_y = np.nanmin(rot_points[:, 1], axis=1), np.nanmax( - rot_points[:, 1], axis=1 - ) + min_x, max_x = np.nanmin(rot_points[:, 0], axis=1), np.nanmax(rot_points[:, 0], axis=1) + min_y, max_y = np.nanmin(rot_points[:, 1], axis=1), np.nanmax(rot_points[:, 1], axis=1) areas = (max_x - min_x) * (max_y - min_y) best_idx = np.argmin(areas) x1, x2 = max_x[best_idx], min_x[best_idx] @@ -715,14 +650,13 @@ def minimum_bounding_rectangle(points: np.ndarray) -> np.ndarray: rval[2], rval[3] = np.dot([x2, y1], r), np.dot([x1, y1], r) return rval + @staticmethod @jit(nopython=True) - def framewise_euclidean_distance( - location_1: np.ndarray, - location_2: np.ndarray, - px_per_mm: float, - centimeter: bool = False, - ) -> np.ndarray: + def framewise_euclidean_distance(location_1: np.ndarray, + location_2: np.ndarray, + px_per_mm: float, + centimeter: bool = False) -> np.ndarray: """ Jitted helper finding frame-wise distances between two moving locations in millimeter or centimeter. @@ -769,28 +703,26 @@ def framewise_euclidean_distance( # # return results.astype(int) - def change_in_bodypart_euclidean_distance( - self, - location_1: np.ndarray, - location_2: np.ndarray, - fps: int, - px_per_mm: float, - time_windows: np.ndarray = np.array([0.2, 0.4, 0.8, 1.6]), - ) -> np.ndarray: + + def change_in_bodypart_euclidean_distance(self, + location_1: np.ndarray, + location_2: np.ndarray, + fps: int, + px_per_mm: float, + time_windows: np.ndarray = np.array([0.2, 0.4, 0.8, 1.6])) -> np.ndarray: """ Computes the difference between the distances of two body-parts in the current frame versus N.N seconds ago. Used for computing if animal body-parts are traveling away or towards each other within defined time-windows. """ - distances = self.framewise_euclidean_distance( - location_1=location_1, location_2=location_2, px_per_mm=px_per_mm - ) - return self._relative_distances( - distances=distances, fps=fps, time_windows=time_windows - ) + distances = self.framewise_euclidean_distance(location_1=location_1, location_2=location_2, px_per_mm=px_per_mm) + return self._relative_distances(distances=distances, fps=fps, time_windows=time_windows) + - def dataframe_gaussian_smoother( - self, df: pd.DataFrame, fps: int, time_window: int = 100 - ) -> pd.DataFrame: + + def dataframe_gaussian_smoother(self, + df: pd.DataFrame, + fps: int, + time_window: int=100) -> pd.DataFrame: """ Column-wise Gaussian smoothing of dataframe. @@ -805,21 +737,14 @@ def dataframe_gaussian_smoother( frames_in_time_window = int(time_window / (1000 / fps)) for c in df.columns: - df[c] = ( - df[c] - .rolling( - window=int(frames_in_time_window), win_type="gaussian", center=True - ) - .mean(std=5) - .fillna(df[c]) - .abs() - .astype(int) - ) + df[c] = df[c].rolling(window=int(frames_in_time_window), win_type='gaussian', center=True).mean(std=5).fillna(df[c]).abs().astype(int) return df - def dataframe_savgol_smoother( - self, df: pd.DataFrame, fps: int, time_window: int = 150 - ) -> pd.DataFrame: + + def dataframe_savgol_smoother(self, + df: pd.DataFrame, + fps: int, + time_window: int = 150) -> pd.DataFrame: """ Column-wise Savitzky-Golay smoothing of dataframe. @@ -833,17 +758,10 @@ def dataframe_savgol_smoother( """ frames_in_time_window = int(time_window / (1000 / fps)) - if (frames_in_time_window % 2) == 0: - frames_in_time_window = frames_in_time_window - 1 - if (frames_in_time_window % 2) <= 3: - frames_in_time_window = 5 + if (frames_in_time_window % 2) == 0: frames_in_time_window = frames_in_time_window - 1 + if (frames_in_time_window % 2) <= 3: frames_in_time_window = 5 for c in df.columns: - df[c] = savgol_filter( - x=df[c].to_numpy(), - window_length=frames_in_time_window, - polyorder=3, - mode="nearest", - ).astype(int) + df[c] = savgol_filter(x=df[c].to_numpy(), window_length=frames_in_time_window, polyorder=3, mode='nearest').astype(int) return df def get_bp_headers(self) -> None: @@ -852,10 +770,11 @@ def get_bp_headers(self) -> None: """ self.col_headers = [] for bp in self.body_parts_lst: - c1, c2, c3 = (f"{bp}_x", f"{bp}_y", f"{bp}_p") + c1, c2, c3 = (f'{bp}_x', f'{bp}_y', f'{bp}_p') self.col_headers.extend((c1, c2, c3)) def check_directionality_cords(self) -> dict: + """ Helper to check if ear and nose body-parts are present within the pose-estimation data. @@ -866,65 +785,47 @@ def check_directionality_cords(self) -> dict: results = {} for animal in self.animal_bp_dict.keys(): results[animal] = {} - results[animal]["Nose"] = {} - results[animal]["Ear_left"] = {} - results[animal]["Ear_right"] = {} - for dimension in ["X_bps", "Y_bps"]: + results[animal]['Nose'] = {} + results[animal]['Ear_left'] = {} + results[animal]['Ear_right'] = {} + for dimension in ['X_bps', 'Y_bps']: for cord in self.animal_bp_dict[animal][dimension]: if ("nose" in cord.lower()) and ("x" in cord.lower()): - results[animal]["Nose"]["X_bps"] = cord + results[animal]['Nose']['X_bps'] = cord elif ("nose" in cord.lower()) and ("y" in cord.lower()): - results[animal]["Nose"]["Y_bps"] = cord - elif ( - ("left" in cord.lower()) - and ("x" in cord.lower()) - and ("ear" in cord.lower()) - ): - results[animal]["Ear_left"]["X_bps"] = cord - elif ( - ("left" in cord.lower()) - and ("Y".lower() in cord.lower()) - and ("ear".lower() in cord.lower()) - ): - results[animal]["Ear_left"]["Y_bps"] = cord - elif ( - ("right" in cord.lower()) - and ("x" in cord.lower()) - and ("ear" in cord.lower()) - ): - results[animal]["Ear_right"]["X_bps"] = cord - elif ( - ("right" in cord.lower()) - and ("y" in cord.lower()) - and ("ear".lower() in cord.lower()) - ): - results[animal]["Ear_right"]["Y_bps"] = cord + results[animal]['Nose']['Y_bps'] = cord + elif ("left" in cord.lower()) and ("x" in cord.lower()) and ("ear" in cord.lower()): + results[animal]['Ear_left']['X_bps'] = cord + elif ("left" in cord.lower()) and ("Y".lower() in cord.lower()) and ("ear".lower() in cord.lower()): + results[animal]['Ear_left']['Y_bps'] = cord + elif ("right" in cord.lower()) and ("x" in cord.lower()) and ("ear" in cord.lower()): + results[animal]['Ear_right']['X_bps'] = cord + elif ("right" in cord.lower()) and ("y" in cord.lower()) and ("ear".lower() in cord.lower()): + results[animal]['Ear_right']['Y_bps'] = cord return results - def insert_default_headers_for_feature_extraction( - self, df: pd.DataFrame, headers: List[str], pose_config: str, filename: str - ) -> pd.DataFrame: + def insert_default_headers_for_feature_extraction(self, + df: pd.DataFrame, + headers: List[str], + pose_config: str, + filename: str) -> pd.DataFrame: """ Helper to insert correct body-part column names prior to defualt feature extraction methods. """ if len(headers) != len(df.columns): - raise CountError( - f"Your SimBA project is set to using the default {pose_config} pose-configuration. " - f"SimBA therefore expects {str(len(headers))} columns of data inside the files within the project_folder. However, " - f"within file {filename} file, SimBA found {str(len(df.columns))} columns." - ) + raise CountError(f'Your SimBA project is set to using the default {pose_config} pose-configuration. ' + f'SimBA therefore expects {str(len(headers))} columns of data inside the files within the project_folder. However, ' + f'within file {filename} file, SimBA found {str(len(df.columns))} columns.', source=self.__class__.__name__) else: df.columns = headers return df @staticmethod - def line_crosses_to_static_targets( - p: List[float], - q: List[float], - n: List[float], - M: List[float], - coord: List[float], - ) -> (bool, List[float]): + def line_crosses_to_static_targets(p: List[float], + q: List[float], + n: List[float], + M: List[float], + coord: List[float]) -> (bool, List[float]): """ Legacy non-jitted helper to calculate if an animal is directing towards a static coordinate (e.g., ROI centroid). @@ -950,16 +851,31 @@ def line_crosses_to_static_targets( Ph = np.sqrt(Px * Px + Py * Py) Qh = np.sqrt(Qx * Qx + Qy * Qy) Nh = np.sqrt(Nx * Nx + Ny * Ny) - if Nh < Ph and Nh < Qh and Qh < Ph: + if (Nh < Ph and Nh < Qh and Qh < Ph): coord.extend((q[0], q[1])) return True, coord - elif Nh < Ph and Nh < Qh and Ph < Qh: + elif (Nh < Ph and Nh < Qh and Ph < Qh): coord.extend((p[0], p[1])) return True, coord else: return False, coord + + +#cdist_3d + +# +# array_1 = np.array([[[100, 200], [301, 198], [234, 512]], +# [[102, 209], [298, 175], [240, 514]]]).astype(np.float32) +# +# array_2 = np.array([[[450, 901], [309, 892], [447, 896]], +# [[449, 903], [303, 879], [451, 899]]]).astype(np.float32) +# +# FeatureExtractionMixin.cdist_3d(array_1=array_1, array_2=array_2) +# + + # # points = np.random.randint(1, 10, size=(10, 2)) -# FeatureExtractionMixin.minimum_bounding_rectangle(points=points) +# FeatureExtractionMixin.minimum_bounding_rectangle(points=points) \ No newline at end of file