From 072963b338eb4d59cf9cb8738361dc9180ebf72f Mon Sep 17 00:00:00 2001 From: sronilsson Date: Sun, 31 Mar 2024 17:08:45 +0000 Subject: [PATCH] cleaned --- .../feature_extraction_supplement_mixin.py | 39 ++- simba/roi_tools/ROI_time_bin_calculator.py | 239 ++++++++++++++---- 2 files changed, 219 insertions(+), 59 deletions(-) diff --git a/simba/mixins/feature_extraction_supplement_mixin.py b/simba/mixins/feature_extraction_supplement_mixin.py index 79674214c..6ce6a7e1e 100644 --- a/simba/mixins/feature_extraction_supplement_mixin.py +++ b/simba/mixins/feature_extraction_supplement_mixin.py @@ -719,10 +719,12 @@ def sequential_lag_analysis( return np.round(preceding_cnt / (preceding_cnt + proceeding_cnt), 3) @staticmethod - def distance_and_velocity(x: np.array, - fps: float, - pixels_per_mm: float, - centimeters: Optional[bool] = True) -> Tuple[float, float]: + def distance_and_velocity( + x: np.array, + fps: float, + pixels_per_mm: float, + centimeters: Optional[bool] = True, + ) -> Tuple[float, float]: """ Calculate total movement and mean velocity from a sequence of position data. @@ -737,20 +739,29 @@ def distance_and_velocity(x: np.array, >>> sum_movement, avg_velocity = FeatureExtractionSupplemental.distance_and_velocity(x=x, fps=10, pixels_per_mm=10, centimeters=True) """ - check_valid_array(data=x, source=FeatureExtractionSupplemental.distance_and_velocity.__name__, accepted_ndims=(1,), - accepted_dtypes=(np.float32, np.float64, np.int32, np.int64, int, float), min_axis_0=1) - check_float(name=f'{FeatureExtractionSupplemental.distance_and_velocity.__name__} fps', value=fps, min_value=1) - check_float(name=f'{FeatureExtractionSupplemental.distance_and_velocity.__name__} pixels_per_mm', value=pixels_per_mm, min_value=10e-6) - movement = (np.sum(x) / pixels_per_mm) + check_valid_array( + data=x, + source=FeatureExtractionSupplemental.distance_and_velocity.__name__, + accepted_ndims=(1,), + accepted_dtypes=(np.float32, np.float64, np.int32, np.int64, int, float), + min_axis_0=1, + ) + check_float( + name=f"{FeatureExtractionSupplemental.distance_and_velocity.__name__} fps", + value=fps, + min_value=1, + ) + check_float( + name=f"{FeatureExtractionSupplemental.distance_and_velocity.__name__} pixels_per_mm", + value=pixels_per_mm, + min_value=10e-6, + ) + movement = np.sum(x) / pixels_per_mm v = [] for i in range(0, x.shape[0], int(fps)): - w = x[i: (i + int(fps))] + w = x[i : (i + int(fps))] v.append((np.sum(w) / pixels_per_mm) * (1 / (w.shape[0] / int(fps)))) if centimeters: v = [vi / 10 for vi in v] movement = movement / 10 return movement, np.mean(v) - - - - diff --git a/simba/roi_tools/ROI_time_bin_calculator.py b/simba/roi_tools/ROI_time_bin_calculator.py index dadb267eb..c00347287 100644 --- a/simba/roi_tools/ROI_time_bin_calculator.py +++ b/simba/roi_tools/ROI_time_bin_calculator.py @@ -1,13 +1,16 @@ import itertools import os -from typing import List, Union, Optional import statistics +from typing import List, Optional, Union + import pandas as pd +from simba.data_processors.timebins_movement_calculator import \ + TimeBinsMovementCalculator from simba.mixins.config_reader import ConfigReader -from simba.mixins.feature_extraction_supplement_mixin import FeatureExtractionSupplemental +from simba.mixins.feature_extraction_supplement_mixin import \ + FeatureExtractionSupplemental from simba.roi_tools.ROI_analyzer import ROIAnalyzer -from simba.data_processors.timebins_movement_calculator import TimeBinsMovementCalculator from simba.utils.checks import check_float, check_if_filepath_list_is_empty from simba.utils.enums import DirNames from simba.utils.errors import FrameRangeError, ROICoordinatesNotFoundError @@ -41,82 +44,228 @@ class ROITimebinCalculator(ConfigReader): >>> calculator.save() """ - def __init__(self, - config_path: Union[str, os.PathLike], - bin_length: float, - body_parts: List[str], - threshold: float, - movement: Optional[bool] = False): + def __init__( + self, + config_path: Union[str, os.PathLike], + bin_length: float, + body_parts: List[str], + threshold: float, + movement: Optional[bool] = False, + ): ConfigReader.__init__(self, config_path=config_path) if not os.path.isfile(self.roi_coordinates_path): - raise ROICoordinatesNotFoundError(expected_file_path=self.roi_coordinates_path) + raise ROICoordinatesNotFoundError( + expected_file_path=self.roi_coordinates_path + ) check_float(name="bin_length", value=bin_length, min_value=10e-6) check_float(name="threshold", value=threshold, min_value=0.0, max_value=1.0) self.read_roi_data() - self.bin_length, self.body_parts, self.threshold = (bin_length, body_parts, threshold) - self.save_path_time = os.path.join(self.logs_path, f"ROI_time_bins_{bin_length}s_time_data_{self.datetime}.csv") - self.save_path_entries = os.path.join(self.logs_path, f"ROI_time_bins_{bin_length}s_entry_data_{self.datetime}.csv") + self.bin_length, self.body_parts, self.threshold = ( + bin_length, + body_parts, + threshold, + ) + self.save_path_time = os.path.join( + self.logs_path, f"ROI_time_bins_{bin_length}s_time_data_{self.datetime}.csv" + ) + self.save_path_entries = os.path.join( + self.logs_path, + f"ROI_time_bins_{bin_length}s_entry_data_{self.datetime}.csv", + ) self.settings = {"threshold": threshold, "body_parts": {}} for i in body_parts: - animal_name = self.find_animal_name_from_body_part_name(bp_name=i, bp_dict=self.animal_bp_dict) + animal_name = self.find_animal_name_from_body_part_name( + bp_name=i, bp_dict=self.animal_bp_dict + ) self.settings["body_parts"][animal_name] = i - self.roi_analyzer = ROIAnalyzer(ini_path=self.config_path, data_path=DirNames.OUTLIER_MOVEMENT_LOCATION.value, calculate_distances=False, settings=self.settings) + self.roi_analyzer = ROIAnalyzer( + ini_path=self.config_path, + data_path=DirNames.OUTLIER_MOVEMENT_LOCATION.value, + calculate_distances=False, + settings=self.settings, + ) self.roi_analyzer.run() self.animal_names = list(self.roi_analyzer.bp_dict.keys()) self.entries_exits_df = self.roi_analyzer.detailed_df self.movement = movement if movement: - self.save_path_movement_velocity = os.path.join(self.logs_path, f"ROI_time_bins_{bin_length}s_movement_velocity_data_{self.datetime}.csv") - self.movement_timebins = TimeBinsMovementCalculator(config_path=config_path, bin_length=bin_length, body_parts=body_parts, plots=False) + self.save_path_movement_velocity = os.path.join( + self.logs_path, + f"ROI_time_bins_{bin_length}s_movement_velocity_data_{self.datetime}.csv", + ) + self.movement_timebins = TimeBinsMovementCalculator( + config_path=config_path, + bin_length=bin_length, + body_parts=body_parts, + plots=False, + ) self.movement_timebins.run() def run(self): - self.results_entries = pd.DataFrame(columns=["VIDEO", "SHAPE", "ANIMAL", "BODY-PART", "TIME BIN #", "ENTRY COUNT"]) - self.results_time = pd.DataFrame(columns=["VIDEO", "SHAPE", "ANIMAL", "BODY-PART", "TIME BIN #", "TIME INSIDE SHAPE (S)"]) - self.results_movement_velocity = pd.DataFrame( columns=["VIDEO", "SHAPE", "ANIMAL", "BODY-PART", "TIME BIN #", "DISTANCE (CM)", "VELOCITY (CM/S)"]) - check_if_filepath_list_is_empty(filepaths=self.outlier_corrected_paths, error_msg=f'No data files found in {self.outlier_corrected_dir}') - print(f"Analyzing time-bin data for {len(self.outlier_corrected_paths)} video(s)...") + self.results_entries = pd.DataFrame( + columns=[ + "VIDEO", + "SHAPE", + "ANIMAL", + "BODY-PART", + "TIME BIN #", + "ENTRY COUNT", + ] + ) + self.results_time = pd.DataFrame( + columns=[ + "VIDEO", + "SHAPE", + "ANIMAL", + "BODY-PART", + "TIME BIN #", + "TIME INSIDE SHAPE (S)", + ] + ) + self.results_movement_velocity = pd.DataFrame( + columns=[ + "VIDEO", + "SHAPE", + "ANIMAL", + "BODY-PART", + "TIME BIN #", + "DISTANCE (CM)", + "VELOCITY (CM/S)", + ] + ) + check_if_filepath_list_is_empty( + filepaths=self.outlier_corrected_paths, + error_msg=f"No data files found in {self.outlier_corrected_dir}", + ) + print( + f"Analyzing time-bin data for {len(self.outlier_corrected_paths)} video(s)..." + ) for file_cnt, file_path in enumerate(self.outlier_corrected_paths): video_timer = SimbaTimer(start=True) _, self.video_name, _ = get_fn_ext(filepath=file_path) _, px_per_mm, fps = self.read_video_info(video_name=self.video_name) frames_per_bin = int(fps * self.bin_length) if frames_per_bin == 0: - raise FrameRangeError(msg=f"The specified time-bin length of {self.bin_length} is TOO SHORT for video {self.video_name} which has a specified FPS of {fps}. This results in time bins that are LESS THAN a single frame.",source=self.__class__.__name__) - video_frms = list(range(0, len(read_df(file_path=file_path, file_type=self.file_type)))) - frame_bins = [video_frms[i * frames_per_bin : (i + 1) * frames_per_bin] for i in range((len(video_frms) + frames_per_bin - 1) // frames_per_bin)] - self.video_data = self.entries_exits_df[self.entries_exits_df["VIDEO"] == self.video_name] - for animal_name, shape_name in list(itertools.product(self.animal_names, self.shape_names)): - body_part = self.settings['body_parts'][animal_name] - data_df = self.video_data.loc[(self.video_data["SHAPE"] == shape_name) & (self.video_data["ANIMAL"] == animal_name)] + raise FrameRangeError( + msg=f"The specified time-bin length of {self.bin_length} is TOO SHORT for video {self.video_name} which has a specified FPS of {fps}. This results in time bins that are LESS THAN a single frame.", + source=self.__class__.__name__, + ) + video_frms = list( + range(0, len(read_df(file_path=file_path, file_type=self.file_type))) + ) + frame_bins = [ + video_frms[i * frames_per_bin : (i + 1) * frames_per_bin] + for i in range((len(video_frms) + frames_per_bin - 1) // frames_per_bin) + ] + self.video_data = self.entries_exits_df[ + self.entries_exits_df["VIDEO"] == self.video_name + ] + for animal_name, shape_name in list( + itertools.product(self.animal_names, self.shape_names) + ): + body_part = self.settings["body_parts"][animal_name] + data_df = self.video_data.loc[ + (self.video_data["SHAPE"] == shape_name) + & (self.video_data["ANIMAL"] == animal_name) + ] entry_frms = list(data_df["ENTRY FRAMES"]) - inside_shape_frms = [list(range(x, y)) for x, y in zip(list(data_df["ENTRY FRAMES"].astype(int)),list(data_df["EXIT FRAMES"].astype(int) + 1))] + inside_shape_frms = [ + list(range(x, y)) + for x, y in zip( + list(data_df["ENTRY FRAMES"].astype(int)), + list(data_df["EXIT FRAMES"].astype(int) + 1), + ) + ] inside_shape_frms = [i for s in inside_shape_frms for i in s] for bin_cnt, bin_frms in enumerate(frame_bins): - frms_inside_roi_in_timebin = [x for x in inside_shape_frms if x in bin_frms] + frms_inside_roi_in_timebin = [ + x for x in inside_shape_frms if x in bin_frms + ] entry_roi_in_timebin = [x for x in entry_frms if x in bin_frms] - self.results_time.loc[len(self.results_time)] = [self.video_name, shape_name, animal_name, body_part, bin_cnt, len(frms_inside_roi_in_timebin) / fps] - self.results_entries.loc[len(self.results_entries)] = [self.video_name, shape_name, animal_name, body_part, bin_cnt, len(entry_roi_in_timebin)] + self.results_time.loc[len(self.results_time)] = [ + self.video_name, + shape_name, + animal_name, + body_part, + bin_cnt, + len(frms_inside_roi_in_timebin) / fps, + ] + self.results_entries.loc[len(self.results_entries)] = [ + self.video_name, + shape_name, + animal_name, + body_part, + bin_cnt, + len(entry_roi_in_timebin), + ] if self.movement: - if (len(frms_inside_roi_in_timebin) > 0): - bin_move = self.movement_timebins.movement_dict[self.video_name].iloc[frms_inside_roi_in_timebin].values.flatten() - movement, velocity = FeatureExtractionSupplemental.distance_and_velocity(x=bin_move, fps=fps, pixels_per_mm=px_per_mm, centimeters=True) - self.results_movement_velocity.loc[len(self.results_movement_velocity)] = [self.video_name, shape_name, animal_name, body_part, bin_cnt, round(movement, 4), round(velocity, 4)] + if len(frms_inside_roi_in_timebin) > 0: + bin_move = ( + self.movement_timebins.movement_dict[self.video_name] + .iloc[frms_inside_roi_in_timebin] + .values.flatten() + ) + movement, velocity = ( + FeatureExtractionSupplemental.distance_and_velocity( + x=bin_move, + fps=fps, + pixels_per_mm=px_per_mm, + centimeters=True, + ) + ) + self.results_movement_velocity.loc[ + len(self.results_movement_velocity) + ] = [ + self.video_name, + shape_name, + animal_name, + body_part, + bin_cnt, + round(movement, 4), + round(velocity, 4), + ] else: - self.results_movement_velocity.loc[len(self.results_movement_velocity)] = [self.video_name, shape_name, animal_name, body_part, bin_cnt, 0, 0] + self.results_movement_velocity.loc[ + len(self.results_movement_velocity) + ] = [ + self.video_name, + shape_name, + animal_name, + body_part, + bin_cnt, + 0, + 0, + ] video_timer.stop_timer() - print(f"Video {self.video_name} complete (elapsed time {video_timer.elapsed_time_str}s)") + print( + f"Video {self.video_name} complete (elapsed time {video_timer.elapsed_time_str}s)" + ) def save(self): - self.results_time.sort_values(by=["VIDEO", "SHAPE", "ANIMAL", "TIME BIN #"]).set_index('VIDEO').to_csv(self.save_path_time) - self.results_entries.sort_values(by=["VIDEO", "SHAPE", "ANIMAL", "TIME BIN #"]).set_index('VIDEO').to_csv(self.save_path_entries) + self.results_time.sort_values( + by=["VIDEO", "SHAPE", "ANIMAL", "TIME BIN #"] + ).set_index("VIDEO").to_csv(self.save_path_time) + self.results_entries.sort_values( + by=["VIDEO", "SHAPE", "ANIMAL", "TIME BIN #"] + ).set_index("VIDEO").to_csv(self.save_path_entries) self.timer.stop_timer() - stdout_success(msg=f"ROI time bin entry data saved at {self.save_path_entries}",elapsed_time=self.timer.elapsed_time_str) - stdout_success(msg=f"ROI time bin time data saved at {self.save_path_time}",elapsed_time=self.timer.elapsed_time_str) + stdout_success( + msg=f"ROI time bin entry data saved at {self.save_path_entries}", + elapsed_time=self.timer.elapsed_time_str, + ) + stdout_success( + msg=f"ROI time bin time data saved at {self.save_path_time}", + elapsed_time=self.timer.elapsed_time_str, + ) if self.movement: - self.results_movement_velocity.sort_values(by=["VIDEO", "SHAPE", "ANIMAL", "TIME BIN #"]).set_index('VIDEO').to_csv(self.save_path_movement_velocity) - stdout_success(msg=f"ROI time-bin movement data saved at {self.save_path_movement_velocity}", elapsed_time=self.timer.elapsed_time_str) + self.results_movement_velocity.sort_values( + by=["VIDEO", "SHAPE", "ANIMAL", "TIME BIN #"] + ).set_index("VIDEO").to_csv(self.save_path_movement_velocity) + stdout_success( + msg=f"ROI time-bin movement data saved at {self.save_path_movement_velocity}", + elapsed_time=self.timer.elapsed_time_str, + ) # test = ROITimebinCalculator(config_path=r"/Users/simon/Desktop/envs/simba/troubleshooting/two_black_animals_14bp/project_folder/project_config.ini",