From 6b035d00536bcd50cbeab6767d173ac0a55919ed Mon Sep 17 00:00:00 2001 From: sronilsson Date: Wed, 28 Aug 2024 12:48:56 -0400 Subject: [PATCH] bento append --- setup.py | 2 +- simba/data_processors/cuda/convex_hull.py | 2 - .../data_processors/cuda/convex_hull_area.py | 40 +++ .../cuda/imgs_to_grayscale_cupy.py | 2 +- simba/mixins/feature_extraction_mixin.py | 12 +- .../BENTO_appender.py | 265 ++++++++++-------- simba/third_party_label_appenders/tools.py | 134 ++++----- simba/utils/read_write.py | 105 ++++++- 8 files changed, 354 insertions(+), 208 deletions(-) create mode 100644 simba/data_processors/cuda/convex_hull_area.py diff --git a/setup.py b/setup.py index c7c0d238b..16cf441cf 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ # Setup configuration setuptools.setup( name="Simba-UW-tf-dev", - version="2.0.7", + version="2.0.8", author="Simon Nilsson, Jia Jie Choong, Sophia Hwang", author_email="sronilsson@gmail.com", description="Toolkit for computer classification and analysis of behaviors in experimental animals", diff --git a/simba/data_processors/cuda/convex_hull.py b/simba/data_processors/cuda/convex_hull.py index 4d7e369d0..b56a2d735 100644 --- a/simba/data_processors/cuda/convex_hull.py +++ b/simba/data_processors/cuda/convex_hull.py @@ -2,9 +2,7 @@ __email__ = "sronilsson@gmail.com" from numba import cuda, njit -from copy import deepcopy import numpy as np -import time THREADS_PER_BLOCK = 128 diff --git a/simba/data_processors/cuda/convex_hull_area.py b/simba/data_processors/cuda/convex_hull_area.py new file mode 100644 index 000000000..03c1a6956 --- /dev/null +++ b/simba/data_processors/cuda/convex_hull_area.py @@ -0,0 +1,40 @@ +from typing import Optional +import cupy as cp + +from simba.utils.checks import check_float, check_valid_array +import numpy as np +from simba.utils.enums import Formats + +def poly_area(data: np.ndarray, + pixels_per_mm: Optional[float] = 1.0, + batch_size: Optional[int] = int(0.5e+7)) -> np.ndarray: + + """ + Compute the area of a polygon using GPU acceleration. + + This function calculates the area of polygons defined by sets of points in a 3D array. + Each 2D slice along the first dimension represents a polygon, with each row corresponding + to a point in the polygon and each column representing the x and y coordinates. + + The computation is done in batches to handle large datasets efficiently. + + :param data: A 3D numpy array of shape (N, M, 2), where N is the number of polygons, M is the number of points per polygon, and 2 represents the x and y coordinates. + :param pixels_per_mm: Optional scaling factor to convert the area from pixels squared to square millimeters. Default is 1.0. + :param batch_size: Optional batch size for processing the data in chunks to fit in memory. Default is 0.5e+7. + :return: A 1D numpy array of shape (N,) containing the computed area of each polygon in square millimeters. + """ + + check_valid_array(data=data, source=f'{poly_area} data', accepted_ndims=(3,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_float(name=f'{poly_area} pixels_per_mm', min_value=10e-16, value=pixels_per_mm) + results = cp.full((data.shape[0]), fill_value=cp.nan, dtype=cp.int32) + for l in range(0, data.shape[0], batch_size): + r = l + batch_size + x = cp.asarray(data[l:r, :, 0]) + y = cp.asarray(data[l:r, :, 1]) + x_r = cp.roll(x, shift=1, axis=1) + y_r = cp.roll(y, shift=1, axis=1) + dot_xy_roll_y = cp.sum(x * y_r, axis=1) + dot_y_roll_x = cp.sum(y * x_r, axis=1) + results[l:r] = (0.5 * cp.abs(dot_xy_roll_y - dot_y_roll_x)) / pixels_per_mm + + return results.get() diff --git a/simba/data_processors/cuda/imgs_to_grayscale_cupy.py b/simba/data_processors/cuda/imgs_to_grayscale_cupy.py index 6e56e0037..5815174b6 100644 --- a/simba/data_processors/cuda/imgs_to_grayscale_cupy.py +++ b/simba/data_processors/cuda/imgs_to_grayscale_cupy.py @@ -30,7 +30,7 @@ def img_stack_to_grayscale_cupy(imgs: np.ndarray, check_if_valid_img(data=imgs[0], source=img_stack_to_grayscale_cupy.__name__) if imgs.ndim != 4: return imgs - results = cp.zeros((imgs.shape[0], imgs.shape[1], imgs.shape[2]), dtype=np.uint8) + results = cp.zeros((imgs.shape[0], imgs.shape[1], imgs.shape[2]), dtype=np.uint8) n = int(np.ceil((imgs.shape[0] / batch_size))) imgs = np.array_split(imgs, n) start = 0 diff --git a/simba/mixins/feature_extraction_mixin.py b/simba/mixins/feature_extraction_mixin.py index 9850a38df..b96b8f866 100644 --- a/simba/mixins/feature_extraction_mixin.py +++ b/simba/mixins/feature_extraction_mixin.py @@ -713,17 +713,11 @@ def minimum_bounding_rectangle(points: np.ndarray) -> np.ndarray: angles = np.arctan2(edges[:, 1], edges[:, 0]) angles = np.abs(np.mod(angles, pi2)) angles = np.unique(angles) - rotations = np.vstack( - [np.cos(angles), np.cos(angles - pi2), np.cos(angles + pi2), np.cos(angles)] - ).T + rotations = np.vstack([np.cos(angles), np.cos(angles - pi2), np.cos(angles + pi2), np.cos(angles)]).T rotations = rotations.reshape((-1, 2, 2)) rot_points = np.dot(rotations, hull_points.T) - min_x, max_x = np.nanmin(rot_points[:, 0], axis=1), np.nanmax( - rot_points[:, 0], axis=1 - ) - min_y, max_y = np.nanmin(rot_points[:, 1], axis=1), np.nanmax( - rot_points[:, 1], axis=1 - ) + min_x, max_x = np.nanmin(rot_points[:, 0], axis=1), np.nanmax(rot_points[:, 0], axis=1) + min_y, max_y = np.nanmin(rot_points[:, 1], axis=1), np.nanmax(rot_points[:, 1], axis=1) areas = (max_x - min_x) * (max_y - min_y) best_idx = np.argmin(areas) x1, x2 = max_x[best_idx], min_x[best_idx] diff --git a/simba/third_party_label_appenders/BENTO_appender.py b/simba/third_party_label_appenders/BENTO_appender.py index a9e3afa03..691941396 100644 --- a/simba/third_party_label_appenders/BENTO_appender.py +++ b/simba/third_party_label_appenders/BENTO_appender.py @@ -3,34 +3,30 @@ import glob import os from copy import deepcopy - import pandas as pd +from typing import Union, Dict, Optional +import numpy as np from simba.mixins.config_reader import ConfigReader -from simba.utils.checks import check_if_filepath_list_is_empty -from simba.utils.errors import AnnotationFileNotFoundError +from simba.utils.checks import check_if_filepath_list_is_empty, check_if_dir_exists, check_all_file_names_are_represented_in_video_log +from simba.utils.errors import NoFilesFoundError from simba.utils.printing import stdout_success -from simba.utils.read_write import get_fn_ext, read_df, write_df -from simba.utils.warnings import ( - ThirdPartyAnnotationsAdditionalClfWarning, - ThirdPartyAnnotationsClfMissingWarning, - ThirdPartyAnnotationsOutsidePoseEstimationDataWarning) +from simba.utils.read_write import get_fn_ext, read_df, write_df, find_files_of_filetypes_in_directory, bento_file_reader +from simba.utils.warnings import (ThirdPartyAnnotationsClfMissingWarning, ThirdPartyAnnotationsOutsidePoseEstimationDataWarning) class BentoAppender(ConfigReader): """ Append BENTO annotation to SimBA featurized datasets. - :param str config_path: path to SimBA project config file in Configparser format - :param str data_dir: Path to folder containing BENTO data. - .. note:: `Example BENTO input file `_. - 'GitHub tutorial `_. - Examples - ---------- + :param str config_path: path to SimBA project config file in Configparser format + :param str data_dir: Path to folder containing BENTO data. + + :example: >>> bento_dir = 'tests/test_data/bento_example' >>> config_path = 'tests/test_data/import_tests/project_folder/project_config.ini' >>> bento_appender = BentoAppender(config_path=config_path, data_dir=bento_dir) @@ -38,125 +34,152 @@ class BentoAppender(ConfigReader): References ---------- - .. [1] Segalin et al., eLife, https://doi.org/10.7554/eLife.63720 """ - def __init__(self, config_path: str, data_dir: str): + def __init__(self, + config_path: Union[str, os.PathLike], + data_dir: Union[str, os.PathLike]): + ConfigReader.__init__(self, config_path=config_path) - self.bento_dir = data_dir - self.feature_files = glob.glob(self.features_dir + "/*." + self.file_type) - self.bento_files = glob.glob(self.bento_dir + "/*." + "annot") - check_if_filepath_list_is_empty( - filepaths=self.feature_files, - error_msg="SIMBA ERROR: No feature files found in project_folder/csv/features_extracted. Extract Features BEFORE appending BENTO annotations", - ) - check_if_filepath_list_is_empty( - filepaths=self.bento_files, - error_msg=f"SIMBA ERROR: No BENTO files with .annot extension found in {self.bento_dir}.", - ) + check_if_dir_exists(in_dir=data_dir) + self.bento_files = find_files_of_filetypes_in_directory(directory=data_dir, extensions=['.annot'], raise_error=False, raise_warning=True) + check_if_filepath_list_is_empty(filepaths=self.feature_file_paths, error_msg="SIMBA ERROR: No feature files found in project_folder/csv/features_extracted. Extract Features BEFORE appending BENTO annotations") + check_if_filepath_list_is_empty(filepaths=self.bento_files, error_msg=f"SIMBA ERROR: No BENTO files with .annot extension found in {data_dir}.") self.saved_files = [] def run(self): - for file_cnt, file_path in enumerate(self.feature_files): - _, self.video_name, ext = get_fn_ext(filepath=file_path) + check_all_file_names_are_represented_in_video_log(video_info_df=self.video_info_df, data_paths=self.bento_files) + for file_cnt, bento_file_path in enumerate(self.bento_files): + _, self.video_name, ext = get_fn_ext(filepath=bento_file_path) print(f"Appending BENTO annotation to video {self.video_name}...") _, _, fps = self.read_video_info(video_name=self.video_name) - bento_path = os.path.join(self.bento_dir, self.video_name + ".annot") - if bento_path not in self.bento_files: - raise AnnotationFileNotFoundError(video_name=self.video_name) - self.save_path = os.path.join( - self.targets_folder, self.video_name + "." + self.file_type - ) - feature_df = read_df(file_path=file_path, file_type=self.file_type) - video_frm_length = len(feature_df) - self.results_df = deepcopy(feature_df) - annotation_df = pd.read_csv( - bento_path, delim_whitespace=True, index_col=False, low_memory=False - ) - start_idx = annotation_df.index[ - annotation_df["Bento"] == "Ch1----------" - ].values[0] - sliced_annot = annotation_df.iloc[start_idx + 1 :] - annotated_behaviors = sliced_annot[sliced_annot["Bento"].str.contains(">")][ - "Bento" - ].tolist() - annotated_behavior_names = [x[1:] for x in annotated_behaviors] - missing_annotation = set(self.clf_names) - set(annotated_behavior_names) - missing_clf = list(set(annotated_behavior_names) - set(self.clf_names)) - annotation_intersection = [ - x for x in self.clf_names if x in annotated_behavior_names - ] - for missing_clf in missing_annotation: - ThirdPartyAnnotationsClfMissingWarning( - video_name=self.video_name, clf_name=missing_clf - ) - self.results_df[missing_clf] = 0 - if missing_clf: - ThirdPartyAnnotationsAdditionalClfWarning( - video_name=self.video_name, clf_names=missing_clf - ) - - for clf_name in annotation_intersection: - self.results_df[clf_name] = 0 - clf_start_idx = sliced_annot.index[ - sliced_annot["Bento"] == f">{clf_name}" - ].values[0] - clf_df = sliced_annot.loc[clf_start_idx + 2 :, :] - end_idx = ( - clf_df.isnull()[clf_df.isnull().any(axis=1)].idxmax(axis=1).index - ) - if end_idx.values: - end_idx = end_idx.values[0] + features_path = os.path.join(self.features_dir, self.video_name + f'.{self.file_type}') + if not os.path.isfile(features_path): + raise NoFilesFoundError(msg=f'No features file for annotation file {self.video_name} file in {self.features_dir}. SimBA is expecting a file at path {features_path}') + self.save_path = os.path.join(self.targets_folder, self.video_name + f'.{self.file_type}') + feature_df = read_df(file_path=features_path, file_type=self.file_type) + self.results = deepcopy(feature_df) + bento_dict = bento_file_reader(file_path=bento_file_path, fps=fps, save_path=None, orient='index') + for clf_name in self.clf_names: + self.results[clf_name] = 0 + if clf_name not in bento_dict.keys(): + ThirdPartyAnnotationsClfMissingWarning(video_name=self.video_name, clf_name=clf_name) else: - end_idx = max(clf_df.index + 1) - clf_df = clf_df.loc[: end_idx - 1, :].reset_index(drop=True) - clf_df.columns = ["start_time", "stop_time", "duration"] - clf_df["start_frm"] = clf_df["start_time"].astype(float) * fps - clf_df["end_frm"] = clf_df["stop_time"].astype(float) * fps - clf_df["start_frm"] = clf_df["start_frm"].astype(int) - clf_df["end_frm"] = clf_df["end_frm"].astype(int) - annotations_idx = list( - clf_df.apply( - lambda x: list( - range(int(x["start_frm"]), int(x["end_frm"]) + 1) - ), - 1, - ) - ) - annotations_idx = [i for s in annotations_idx for i in s] - annotations_idx_outside_video = [ - x for x in annotations_idx if x > video_frm_length - ] - valid_annotation_ids = [ - x for x in annotations_idx if x <= video_frm_length - ] - if len(annotations_idx_outside_video): - ThirdPartyAnnotationsOutsidePoseEstimationDataWarning( - video_name=self.video_name, - clf_name=clf_name, - frm_cnt=len(feature_df), - first_error_frm=annotations_idx_outside_video[0], - ambiguous_cnt=len(annotations_idx_outside_video), - ) - if len(valid_annotation_ids) > 0: - print( - f"Appending {str(len(valid_annotation_ids))} {clf_name} frame annotations to video {self.video_name}..." - ) - self.results_df.loc[valid_annotation_ids, clf_name] = 1 + clf_bento_df = bento_dict[clf_name] + annotations_idx = [i for s in list(clf_bento_df.apply(lambda x: list(range(int(x["START"]), int(x["STOP"]))), 1)) for i in s] + annotations_idx_outside_video = [x for x in annotations_idx if x > len(feature_df)] + valid_annotation_ids = [x for x in annotations_idx if x < len(feature_df)] + + if len(annotations_idx_outside_video) > 0: + ThirdPartyAnnotationsOutsidePoseEstimationDataWarning(video_name=self.video_name, + clf_name=clf_name, + frm_cnt=len(feature_df), + first_error_frm=annotations_idx_outside_video[0], + ambiguous_cnt=len(annotations_idx_outside_video)) + if len(valid_annotation_ids) > 0: + print(f"Appending {str(len(valid_annotation_ids))} {clf_name} frame annotations to video {self.video_name}...") + self.results.loc[valid_annotation_ids, clf_name] = 1 self.__save() - stdout_success( - msg=f"Annotations for {str(len(self.saved_files))} video(s) and saved in project_folder/csv/targets_inserted directory." - ) + stdout_success(msg=f"Annotations for {str(len(self.saved_files))} video(s) and saved in the {self.targets_folder}.") def __save(self): - write_df(df=self.results_df, file_type=self.file_type, save_path=self.save_path) + write_df(df=self.results, file_type=self.file_type, save_path=self.save_path) self.saved_files.append(self.save_path) - print( - f"BENTO annotations appended to video {self.video_name} and saved in {self.save_path}" - ) - - -# test = BentoAppender(config_path='/Users/simon/Desktop/envs/simba_dev/tests/test_data/import_tests/project_folder/project_config.ini', -# bento_dir='/Users/simon/Desktop/envs/simba_dev/tests/test_data/bento_example') + print(f"BENTO annotations appended to video {self.video_name} and saved in {self.save_path}") + + + +# +# +# # +# # +# # annotation_df = pd.read_csv( +# # bento_path, delim_whitespace=True, index_col=False, low_memory=False +# # ) +# # start_idx = annotation_df.index[ +# # annotation_df["Bento"] == "Ch1----------" +# # ].values[0] +# # sliced_annot = annotation_df.iloc[start_idx + 1 :] +# # annotated_behaviors = sliced_annot[sliced_annot["Bento"].str.contains(">")][ +# # "Bento" +# # ].tolist() +# # annotated_behavior_names = [x[1:] for x in annotated_behaviors] +# # missing_annotation = set(self.clf_names) - set(annotated_behavior_names) +# # missing_clf = list(set(annotated_behavior_names) - set(self.clf_names)) +# # annotation_intersection = [ +# # x for x in self.clf_names if x in annotated_behavior_names +# # ] +# # for missing_clf in missing_annotation: +# # ThirdPartyAnnotationsClfMissingWarning( +# # video_name=self.video_name, clf_name=missing_clf +# # ) +# # self.results_df[missing_clf] = 0 +# # if missing_clf: +# # ThirdPartyAnnotationsAdditionalClfWarning( +# # video_name=self.video_name, clf_names=missing_clf +# # ) +# # +# # for clf_name in annotation_intersection: +# # self.results_df[clf_name] = 0 +# # clf_start_idx = sliced_annot.index[ +# # sliced_annot["Bento"] == f">{clf_name}" +# # ].values[0] +# # clf_df = sliced_annot.loc[clf_start_idx + 2 :, :] +# # end_idx = ( +# # clf_df.isnull()[clf_df.isnull().any(axis=1)].idxmax(axis=1).index +# # ) +# # if end_idx.values: +# # end_idx = end_idx.values[0] +# # else: +# # end_idx = max(clf_df.index + 1) +# # clf_df = clf_df.loc[: end_idx - 1, :].reset_index(drop=True) +# # clf_df.columns = ["start_time", "stop_time", "duration"] +# # clf_df["start_frm"] = clf_df["start_time"].astype(float) * fps +# # clf_df["end_frm"] = clf_df["stop_time"].astype(float) * fps +# # clf_df["start_frm"] = clf_df["start_frm"].astype(int) +# # clf_df["end_frm"] = clf_df["end_frm"].astype(int) +# # annotations_idx = list( +# # clf_df.apply( +# # lambda x: list( +# # range(int(x["start_frm"]), int(x["end_frm"]) + 1) +# # ), +# # 1, +# # ) +# # ) +# # annotations_idx = [i for s in annotations_idx for i in s] +# # annotations_idx_outside_video = [ +# # x for x in annotations_idx if x > video_frm_length +# # ] +# # valid_annotation_ids = [ +# # x for x in annotations_idx if x <= video_frm_length +# # ] +# # if len(annotations_idx_outside_video): +# # ThirdPartyAnnotationsOutsidePoseEstimationDataWarning( +# # video_name=self.video_name, +# # clf_name=clf_name, +# # frm_cnt=len(feature_df), +# # first_error_frm=annotations_idx_outside_video[0], +# # ambiguous_cnt=len(annotations_idx_outside_video), +# # ) +# # if len(valid_annotation_ids) > 0: +# # print( +# # f"Appending {str(len(valid_annotation_ids))} {clf_name} frame annotations to video {self.video_name}..." +# # ) +# # self.results_df.loc[valid_annotation_ids, clf_name] = 1 +# # self.__save() +# # stdout_success( +# # msg=f"Annotations for {str(len(self.saved_files))} video(s) and saved in project_folder/csv/targets_inserted directory." +# # ) +# # +# # def __save(self): +# # write_df(df=self.results_df, file_type=self.file_type, save_path=self.save_path) +# # self.saved_files.append(self.save_path) +# # print( +# # f"BENTO annotations appended to video {self.video_name} and saved in {self.save_path}" +# # ) +# # +# +# test = BentoAppender(config_path=r"C:\troubleshooting\bento_test\project_folder\project_config.ini", +# data_dir=r"C:\troubleshooting\bento_test\bento_files") # test.run() diff --git a/simba/third_party_label_appenders/tools.py b/simba/third_party_label_appenders/tools.py index a2f503a51..3a10b1ac8 100644 --- a/simba/third_party_label_appenders/tools.py +++ b/simba/third_party_label_appenders/tools.py @@ -1,14 +1,74 @@ -from typing import Dict, List +from typing import Dict, List, Union, Optional +try: + from typing import Literal +except: + from typing_extensions import Literal import numpy as np import pandas as pd +import os from simba.utils.data import detect_bouts from simba.utils.enums import Methods from simba.utils.errors import ColumnNotFoundError, InvalidFileTypeError -from simba.utils.read_write import get_fn_ext, read_video_info +from simba.utils.read_write import get_fn_ext, read_video_info, bento_file_reader, read_video_info_csv, find_files_of_filetypes_in_directory from simba.utils.warnings import ThirdPartyAnnotationsInvalidFileFormatWarning +from simba.utils.checks import (check_valid_lst, + check_valid_dataframe, + check_all_file_names_are_represented_in_video_log, + check_str, + check_valid_boolean, + check_file_exist_and_readable, + check_if_dir_exists) +BENTO = "Bento" + + +def read_bento_files(data_paths: Union[List[str], str, os.PathLike], + video_info_df: Union[str, os.PathLike, pd.DataFrame], + error_setting: Literal[Union[None, Methods.ERROR.value, Methods.WARNING.value]] = None, + log_setting: Optional[bool] = False) -> Dict[str, pd.DataFrame]: + + """ + Reads multiple BENTO annotation files and processes them into a dictionary of DataFrames, each representing the + combined annotations for a corresponding video. The function verifies that all files exist and that the file names + match the video information provided. + + :param Union[List[str], str, os.PathLike] data_paths: Paths to BENTO annotation files or a directory containing such files. If a directory is provided, all files with the extension '.annot' will be processed. + :param Union[str, os.PathLike, pd.DataFrame] video_info_df: Path to a CSV file containing video information or a preloaded DataFrame with the same data. This information is used to match BENTO files with their corresponding videos and extract the FPS. + :param Literal[Union[None, Methods.ERROR.value, Methods.WARNING.value]] error_setting: Determines the error handling mode. If set to `Methods.ERROR.value`, errors will raise exceptions. If set to `Methods.WARNING.value`, errors will generate warnings instead. If None, no error handling modifications are applied. + :param Optional[bool] = False) -> Dict[str, pd.DataFrame] log_setting: If True, logging will be enabled for the process, providing detailed information about the steps being executed. + :return: A dictionary where the keys are video names and the values are DataFrames containing the combined annotations for each video. + :rtype: Dict[str, pd.DataFrame] + + :example: + >>> dfs = read_bento_files(data_paths=r"C:\troubleshooting\bento_test\bento_files", error_setting='WARNING', log_setting=False, video_info_df=r"C:\troubleshooting\bento_test\project_folder\logs\video_info.csv") + """ + + if error_setting is not None: + check_str(name=f'{read_bento_files.__name__} error_setting', value=error_setting, options=(Methods.ERROR.value, Methods.WARNING.value)) + check_valid_boolean(value=log_setting, source=f'{read_bento_files.__name__} log_setting') + raise_error = False + if error_setting == Methods.ERROR.value: + raise_error = True + if isinstance(video_info_df, str): + check_file_exist_and_readable(file_path=video_info_df) + video_info_df = read_video_info_csv(file_path=video_info_df) + if isinstance(data_paths, list): + check_valid_lst(data=data_paths, source=f'{read_bento_files.__name__} data_paths', min_len=1, valid_dtypes=(str,)) + elif isinstance(data_paths, str): + check_if_dir_exists(in_dir=data_paths, source=f'{read_bento_files.__name__} data_paths') + data_paths = find_files_of_filetypes_in_directory(directory=data_paths, extensions=['.annot'], raise_error=True) + check_all_file_names_are_represented_in_video_log(video_info_df=video_info_df, data_paths=data_paths) + check_valid_dataframe(df=video_info_df, source=read_bento_files.__name__) + dfs = {} + for file_cnt, file_path in enumerate(data_paths): + _, video_name, ext = get_fn_ext(filepath=file_path) + _, _, fps = read_video_info(vid_info_df=video_info_df, video_name=video_name) + bento_dict = bento_file_reader(file_path=file_path, fps=fps, orient='columns', save_path=None, raise_error=raise_error, log_setting=log_setting) + dfs[video_name] = pd.concat(bento_dict.values(), ignore_index=True) + + return dfs def observer_timestamp_corrector(timestamps: List[str]) -> List[str]: corrected_ts = [] @@ -314,76 +374,6 @@ def read_solomon_files( # video_info_df=video_info_df) -def read_bento_files( - data_paths: List[str], - error_setting: str, - video_info_df: pd.DataFrame, - log_setting: bool = False, -) -> Dict[str, pd.DataFrame]: - BENTO = "Bento" - CHANNEL = "Ch1----------" - - dfs = {} - for file_cnt, file_path in enumerate(data_paths): - _, video_name, ext = get_fn_ext(filepath=file_path) - _, _, fps = read_video_info(vid_info_df=video_info_df, video_name=video_name) - try: - data_df = pd.read_csv( - file_path, delim_whitespace=True, index_col=False, low_memory=False - ) - start_idx = data_df.index[data_df[BENTO] == CHANNEL].values[0] - sliced_annot = data_df.iloc[start_idx + 1 :] - clfs = sliced_annot[sliced_annot[BENTO].str.contains(">")]["Bento"].tolist() - video_events = [] - for clf_name in clfs: - start_idx = sliced_annot.index[ - sliced_annot[BENTO] == f"{clf_name}" - ].values[0] - clf_df = sliced_annot.loc[start_idx + 2 :, :] - end_idx = ( - clf_df.isnull()[clf_df.isnull().any(axis=1)].idxmax(axis=1).index - ) - if end_idx.values: - end_idx = end_idx.values[0] - else: - end_idx = max(clf_df.index + 1) - clf_df = ( - clf_df.loc[: end_idx - 1, :] - .reset_index(drop=True) - .drop("file", axis=1) - .astype(float) - ) - clf_df.columns = ["START", "STOP"] - clf_df = clf_df * fps - for obs in clf_df.values: - video_events.append([clf_name, "START", obs[0]]) - video_events.append([clf_name, "STOP", obs[1]]) - video_df = pd.DataFrame( - video_events, columns=["BEHAVIOR", "EVENT", "FRAME"] - ) - video_df["FRAME"] = video_df["FRAME"].astype(int) - video_df["BEHAVIOR"] = video_df["BEHAVIOR"].str[1:] - dfs[video_name] = video_df - except Exception as e: - if error_setting == Methods.WARNING.value: - ThirdPartyAnnotationsInvalidFileFormatWarning( - annotation_app="BENTO", file_path=file_path, log_status=log_setting - ) - elif error_setting == Methods.ERROR.value: - raise InvalidFileTypeError( - msg=f"{file_path} is not a valid BENTO file. See the docs for expected file format." - ) - else: - pass - return dfs - - -# video_info_df = read_video_info_csv(file_path='/Users/simon/Desktop/envs/troubleshooting/two_black_animals_14bp/project_folder/logs/video_info.csv') -# -# df = read_bento_files(data_paths=['/Users/simon/Desktop/envs/simba_dev/tests/test_data/bento_example/Together_1.annot'], -# error_setting='WARNING', -# log_setting=False, -# video_info_df=video_info_df) def read_deepethogram_files( diff --git a/simba/utils/read_write.py b/simba/utils/read_write.py index b9256068a..bdd361e95 100644 --- a/simba/utils/read_write.py +++ b/simba/utils/read_write.py @@ -37,7 +37,7 @@ check_if_dir_exists, check_if_filepath_list_is_empty, check_if_string_value_is_valid_video_timestamp, - check_instance, check_int, + check_instance, check_int, check_str, check_nvidea_gpu_available, check_valid_lst) from simba.utils.enums import ConfigKey, Dtypes, Formats, Keys, Options from simba.utils.errors import (DataHeaderError, DuplicationError, @@ -50,7 +50,8 @@ ParametersFileError, PermissionError) from simba.utils.printing import SimbaTimer, stdout_success from simba.utils.warnings import (FileExistWarning, InvalidValueWarning, - NoDataFoundWarning, NoFileFoundWarning) + NoDataFoundWarning, NoFileFoundWarning, + ThirdPartyAnnotationsInvalidFileFormatWarning) # from simba.utils.keyboard_listener import KeyboardListener @@ -2091,3 +2092,103 @@ def find_largest_blob_location(imgs: dict, verbose: Optional[bool] = False, vide print(e.args) results[frm_idx] = np.array([np.nan, np.nan]) return results + + + +def bento_file_reader(file_path: Union[str, os.PathLike], + fps: Optional[float] = None, + orient: Optional[Literal['index', 'columns']] = 'index', + save_path: Optional[Union[str, os.PathLike]] = None, + raise_error: Optional[bool] = False, + log_setting: Optional[bool] = False) -> Union[None, Dict[str, pd.DataFrame]]: + + """ + Reads a BENTO annotation file and processes it into a dictionary of DataFrames, each representing a classified behavior. + Optionally, the results can be saved to a specified path. + + The function handles both frame-based and second-based annotations, converting the latter to frame-based + annotations if the frames-per-second (FPS) is provided or can be inferred from the file. + + :param Union[str, os.PathLike] file_path: Path to the BENTO annotation file. + :param Optional[float] fps: Frames per second (FPS) for converting second-based annotations to frames. If not provided, the function will attempt to infer FPS from the file. If FPS is required and cannot be inferred, an error is raised. + :param Optional[Union[str, os.PathLike]] save_path: Path to save the processed results as a pickle file. If None, results are returned instead of saved. + :return: A dictionary where the keys are classifier names and the values are DataFrames with 'START' and 'STOP' columns representing the start and stop frames of each behavior. + :rtype: Dict[str, pd.DataFrame] + + :example: + >>> bento_file_reader(file_path=r"C:\troubleshooting\bento_test\bento_files\20240812_crumpling3.annot") + """ + + def _orient_columns_melt(df: pd.DataFrame) -> pd.DataFrame: + df = df[['START', 'STOP']].astype(np.int32).reset_index() + df = df.melt(id_vars='index', var_name=None).drop('index', axis=1) + df["BEHAVIOR"] = clf_name + df.columns = ["EVENT", "FRAME", 'BEHAVIOR'] + return df.sort_values(by='FRAME', ascending=True)[['BEHAVIOR', "EVENT", "FRAME"]].reset_index(drop=True) + + check_file_exist_and_readable(file_path=file_path) + check_str(name=f'{bento_file_reader.__name__} orient', value=orient, options=('index', 'columns')) + if fps is not None: + check_int(name=f'{bento_file_reader.__name__} fps', value=fps, min_value=1) + _, video_name, _ = get_fn_ext(filepath=file_path) + try: + df = pd.read_csv(file_path, index_col=False, low_memory=False, header=None, encoding='utf-8').astype(str) + except: + df = pd.read_csv(file_path, index_col=False, low_memory=False, header=None, encoding='ascii').astype(str) + idx = df[0].str.contains(pat='>', regex=True) + idx = list(idx.index[idx]) + results = {} + if len(idx) == 0: + if raise_error: + raise NoDataError(f"{file_path} is not a valid BENTO file. See the docs for expected file format.", source=bento_file_reader.__name__) + else: + ThirdPartyAnnotationsInvalidFileFormatWarning(annotation_app="BENTO", file_path=file_path, source=bento_file_reader.__name__, log_status=log_setting) + return results + idx.append(len(df)) + idx_mod = [0] + idx + [max(idx) + 1] + clf_dfs = [df.iloc[idx_mod[n]:idx_mod[n + 1]] for n in range(len(idx_mod) - 1)][1:-1] + for clf_idx in range(len(clf_dfs)): + clf_df = clf_dfs[clf_idx].reset_index(drop=True) + clf_name = clf_df.iloc[0, 0][1:] + clf_df = clf_df.iloc[2:, 0].reset_index(drop=True) + out_clf_df = clf_df.str.split('\t', expand=True) + if len(out_clf_df.columns) > 3: + if raise_error: + raise InvalidFileTypeError(msg=f'SimBA found {len(out_clf_df.columns)} columns for file {file_path} and classifier {clf_name} when trying to split the data by tabs.') + else: + ThirdPartyAnnotationsInvalidFileFormatWarning(annotation_app="BENTO", file_path=file_path, source=bento_file_reader.__name__, log_status=log_setting) + return results + numeric_check = list(out_clf_df.apply(lambda s: pd.to_numeric(s, errors='coerce').notnull().all())) + if False in numeric_check: + if raise_error: + raise InvalidInputError(msg=f'SimBA found values in the annotation data for behavior {clf_name} in file {file_path} that could not be interpreted as numeric values (seconds or frame numbers)') + else: + ThirdPartyAnnotationsInvalidFileFormatWarning(annotation_app="BENTO", file_path=file_path, source=bento_file_reader.__name__, log_status=log_setting) + return results + out_clf_df.columns = ['START', 'STOP', 'DURATION'] + out_clf_df = out_clf_df.astype(np.float32) + int_check = np.array_equal(out_clf_df, out_clf_df.astype(int)) + if int_check: + if orient == 'index': + results[clf_name] = out_clf_df[['START', 'STOP']].astype(np.int32) + else: + results[clf_name] = _orient_columns_melt(df=out_clf_df) + + else: + if fps is None: + try: + fps_idx = df[0].str.contains(pat='Annotation framerate', regex=True) + fps_str = df.iloc[list(fps_idx.index[fps_idx])][0].values[0] + fps = float(fps_str.split(':')[1]) + except: + raise FrameRangeError(f'The annotations are in seconds and FPS was not passed. FPS could also not be read from the BENTO file', source=bento_file_reader.__name__) + out_clf_df["START"] = out_clf_df["START"].astype(float) * fps + out_clf_df["STOP"] = out_clf_df["STOP"].astype(float) * fps + if orient == 'index': + results[clf_name] = out_clf_df[['START', 'STOP']].astype(np.int32) + else: + results[clf_name] = _orient_columns_melt(df=out_clf_df) + if save_path is None: + return results + else: + write_pickle(data=results, save_path=save_path) \ No newline at end of file