diff --git a/simba/mixins/statistics_mixin.py b/simba/mixins/statistics_mixin.py index 619391253..fead3e9a5 100644 --- a/simba/mixins/statistics_mixin.py +++ b/simba/mixins/statistics_mixin.py @@ -18,12 +18,13 @@ types) from scipy import stats from scipy.stats.distributions import chi2 +from statsmodels.stats.libqsturng import psturng +from statsmodels.stats.multicomp import pairwise_tukeyhsd from sklearn.covariance import EllipticEnvelope from sklearn.ensemble import IsolationForest from simba.mixins.feature_extraction_mixin import FeatureExtractionMixin -from simba.utils.checks import (check_float, check_int, check_str, - check_valid_array, check_valid_dataframe) +from simba.utils.checks import (check_float, check_int, check_str, check_valid_array, check_valid_dataframe, check_valid_lst) from simba.utils.data import bucket_data, fast_mean_rank from simba.utils.enums import Formats, Options from simba.utils.errors import CountError, InvalidInputError @@ -409,7 +410,7 @@ def one_way_anova( :rtype: Tuple[float, float] :example: - >>> sample_1 = np.array([1, 2, 3, 1, 3, 2, 1, 10, 8, 4, 10]) + >>> saxfmple_1 = np.array([1, 2, 3, 1, 3, 2, 1, 10, 8, 4, 10]) >>> sample_2 = np.array([8, 5, 5, 8, 8, 9, 10, 1, 7, 10, 10]) >>> Statistics().one_way_anova(sample_1=sample_2, sample_2=sample_1) """ @@ -4377,3 +4378,113 @@ def sliding_iqr(x: np.ndarray, window_size: float, sample_rate: float) -> np.nda results[r - 1] = upper_val - lower_val return results + @staticmethod + def one_way_anova_scipy(x: np.ndarray, + y: np.ndarray, + variable_names: List[str], + x_name: str = '', + y_name: str = '') -> pd.DataFrame: + """ + Compute one-way ANOVAs comparing each column (axis 1) on two arrays. + + .. notes:: + Use for computing and presenting aggregate statistics. Not suitable for featurization. + + .. seealso:: + For featurization instead use :func:`simba.mixins.statistics_mixin.Statistics.rolling_one_way_anova` or + :func:`simba.mixins.statistics_mixin.Statistics.one_way_anova` + + :param np.ndarray x: First 2d array with observations rowwise and variables columnwise. + :param np.ndarray y: Second 2d array with observations rowwise and variables columnwise. Must be same number of columns as x. + :param List[str, ...] variable_names: Names of columnwise variable names. Same length as number of data columns. + :param str x_name: Name of the first group (x). + :param str y_name: Name of the second group (y). + :return: Dataframe with one row per column representing the ANOVA F-statistic and P-values comparing the variables between x and y. + :rtype: pd.DataFrame + """ + + check_valid_array(data=x, source=f'{Statistics.one_way_anova_scipy.__name__} x', accepted_ndims=(2,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=y, source=f'{Statistics.one_way_anova_scipy.__name__} y', accepted_ndims=(2,), accepted_dtypes=Formats.NUMERIC_DTYPES.value, accepted_axis_1_shape=(x.shape[1],)) + check_str(name=f'{Statistics.one_way_anova_scipy.__name__} x_name', value=x_name, allow_blank=True) + check_str(name=f'{Statistics.one_way_anova_scipy.__name__} y_name', value=y_name, allow_blank=True) + check_valid_lst(source=f'{Statistics.one_way_anova_scipy.__name__} variable_names', data=variable_names, valid_dtypes=(str,), exact_len=x.shape[1]) + results = pd.DataFrame(variable_names, columns=['FEATURE']) + results[['GROUP_1', 'GROUP_2']] = x_name, y_name + results['F-STATISTIC'], results['P-VALUE'] = stats.f_oneway(x, y) + + results['P-VALUE'] = results['P-VALUE'].round(8) + + return results + + @staticmethod + def kruskal_scipy(x: np.ndarray, + y: np.ndarray, + variable_names: List[str], + x_name: str = '', + y_name: str = '') -> pd.DataFrame: + """ + Compute Kruskal-Wallis comparing each column (axis 1) on two arrays. + + .. notes:: + Use for computing and presenting aggregate statistics. Not suitable for featurization. + + .. seealso:: + For featurization instead use :func:`simba.mixins.statistics_mixin.Statistics.kruskal_wallis` + + :param np.ndarray x: First 2d array with observations rowwise and variables columnwise. + :param np.ndarray y: Second 2d array with observations rowwise and variables columnwise. Must be same number of columns as x. + :param List[str, ...] variable_names: Names of columnwise variable names. Same length as number of data columns. + :param str x_name: Name of the first group (x). + :param str y_name: Name of the second group (y). + :return: Dataframe with one row per column representing the Kruskal-Wallis statistic and P-values comparing the variables between x and y. + :rtype: pd.DataFrame + """ + + check_valid_array(data=x, source=f'{Statistics.kruskal_scipy.__name__} x', accepted_ndims=(2,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=y, source=f'{Statistics.kruskal_scipy.__name__} y', accepted_ndims=(2,), accepted_dtypes=Formats.NUMERIC_DTYPES.value, accepted_axis_1_shape=(x.shape[1],)) + check_str(name=f'{Statistics.kruskal_scipy.__name__} x_name', value=x_name, allow_blank=True) + check_str(name=f'{Statistics.kruskal_scipy.__name__} y_name', value=y_name, allow_blank=True) + check_valid_lst(source=f'{Statistics.kruskal_scipy.__name__} variable_names', data=variable_names, valid_dtypes=(str,), exact_len=x.shape[1]) + results = pd.DataFrame(variable_names, columns=['FEATURE']) + results[['GROUP_1', 'GROUP_2']] = x_name, y_name + results['STATISTIC'], results['P-VALUE'] = stats.kruskal(x, y) + + results['P-VALUE'] = results['P-VALUE'].round(8) + + return results + + + + @staticmethod + def pairwise_tukeyhsd_scipy(data: np.ndarray, + group: np.ndarray, + variable_names: List[str], + verbose: bool = False) -> pd.DataFrame: + + """ + Compute pairwise grouped Tukey-HSD tests. + + .. notes:: + Use for computing and presenting aggregate statistics. Not suitable for featurization. + + :param np.ndarray data: 2D array with observations rowwise (axis 0) and features columnwise (axis 1) + :param np.ndarray group: 1D array with the same number of observations as rows in ``data`` containing the group for each sample. + :param List[str, ...] variable_names: Names of columnwise variable names. Same length as number of data columns. + :return: Dataframe comparing each group for each variable. + :rtype: pd.DataFrame + """ + + check_valid_array(data=data, source=f'{Statistics.pairwise_tukeyhsd_scipy.__name__} data', accepted_ndims=(2,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=group, source=f'{Statistics.pairwise_tukeyhsd_scipy.__name__} group', accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value, accepted_axis_0_shape=(data.shape[0],)) + check_valid_lst(source=f'{Statistics.pairwise_tukeyhsd_scipy.__name__} variable_names', data=variable_names, valid_dtypes=(str,), exact_len=data.shape[1]) + results = [] + for var in range(data.shape[1]): + if verbose: + print(f'Computing Tukey HSD for variable {var+1}/{data.shape[1]}...') + tukey_data = pairwise_tukeyhsd(data[:, var], group) + df = pd.DataFrame(data=tukey_data._results_table.data[1:], columns=tukey_data._results_table.data[0]) + df['P-VALUE'] = psturng(np.abs(tukey_data.meandiffs / tukey_data.std_pairs), len(tukey_data.groupsunique), tukey_data.df_total) + df['FEATURE'] = variable_names[var] + results.append(df) + + return pd.concat(results, axis=0) \ No newline at end of file diff --git a/simba/sandbox/bg_remover.py b/simba/sandbox/bg_remover.py new file mode 100644 index 000000000..cca3d78d0 --- /dev/null +++ b/simba/sandbox/bg_remover.py @@ -0,0 +1,117 @@ +import os +from copy import deepcopy +from typing import Optional, Tuple, Union + +import cv2 +import numpy as np +try: + from typing import Literal +except: + from typing_extensions import Literal + +from simba.utils.checks import (check_file_exist_and_readable,check_if_dir_exists) +from simba.utils.enums import Formats +from simba.utils.printing import SimbaTimer, stdout_success +from simba.utils.read_write import (get_fn_ext, get_video_meta_data) +from simba.video_processors.video_processing import create_average_frm + +def video_bg_subtraction(video_path: Union[str, os.PathLike], + bg_video_path: Optional[Union[str, os.PathLike]] = None, + bg_start_frm: Optional[int] = None, + bg_end_frm: Optional[int] = None, + bg_start_time: Optional[str] = None, + bg_end_time: Optional[str] = None, + bg_color: Optional[Tuple[int, int, int]] = (0, 0, 0), + fg_color: Optional[Tuple[int, int, int]] = None, + save_path: Optional[Union[str, os.PathLike]] = None, + threshold: Optional[int] = 50, + verbose: Optional[bool] = True) -> None: + """ + Subtract the background from a video. + + .. video:: _static/img/video_bg_subtraction.webm + :width: 800 + :autoplay: + :loop: + + .. video:: _static/img/bg_remover_example_1.webm + :width: 800 + :autoplay: + :loop: + + .. video:: _static/img/bg_remover_example_2.webm + :width: 800 + :autoplay: + :loop: + + .. note:: + If ``bg_video_path`` is passed, that video will be used to parse the background. If None, ``video_path`` will be use dto parse background. + Either pass ``start_frm`` and ``end_frm`` OR ``start_time`` and ``end_time`` OR pass all four arguments as None. + Those two arguments will be used to slice the background video, and the sliced part is used to parse the background. + + For example, in the scenario where there is **no** animal in the ``video_path`` video for the first 20s, then the first 20s can be used to parse the background. + In this scenario, ``bg_video_path`` can be passed as ``None`` and bg_start_time and bg_end_time can be ``00:00:00`` and ``00:00:20``, repectively. + + In the scenario where there **is** animal(s) in the entire ``video_path`` video, pass ``bg_video_path`` as a path to a video recording the arena without the animals. + + :param Union[str, os.PathLike] video_path: The path to the video to remove the background from. + :param Optional[Union[str, os.PathLike]] bg_video_path: Path to the video which contains a segment with the background only. If None, then ``video_path`` will be used. + :param Optional[int] bg_start_frm: The first frame in the background video to use when creating a representative background image. Default: None. + :param Optional[int] bg_end_frm: The last frame in the background video to use when creating a representative background image. Default: None. + :param Optional[str] bg_start_time: The start timestamp in `HH:MM:SS` format in the background video to use to create a representative background image. Default: None. + :param Optional[str] bg_end_time: The end timestamp in `HH:MM:SS` format in the background video to use to create a representative background image. Default: None. + :param Optional[Tuple[int, int, int]] bg_color: The RGB color of the moving objects in the output video. Defaults to None, which represents the original colors of the moving objects. + :param Optional[Tuple[int, int, int]] fg_color: The RGB color of the background output video. Defaults to black (0, 0, 0). + :param Optional[Union[str, os.PathLike]] save_path: The patch to where to save the output video where the background is removed. If None, saves the output video in the same directory as the input video with the ``_bg_subtracted`` suffix. Default: None. + :return: None. + + :example: + >>> video_bg_subtraction(video_path='/Users/simon/Downloads/1_LH_cropped.mp4', bg_start_time='00:00:00', bg_end_time='00:00:10', bg_color=(0, 106, 167), fg_color=(254, 204, 2)) + """ + + timer = SimbaTimer(start=True) + check_file_exist_and_readable(file_path=video_path) + if bg_video_path is None: + bg_video_path = deepcopy(video_path) + video_meta_data = get_video_meta_data(video_path=video_path) + dir, video_name, ext = get_fn_ext(filepath=video_path) + if save_path is None: + save_path = os.path.join(dir, f'{video_name}_bg_subtracted{ext}') + else: + check_if_dir_exists(in_dir=os.path.dirname(save_path), source=video_bg_subtraction.__name__) + fourcc = cv2.VideoWriter_fourcc(*Formats.MP4_CODEC.value) + writer = cv2.VideoWriter(save_path, fourcc, video_meta_data['fps'],(video_meta_data['width'], video_meta_data['height'])) + bg_frm = create_average_frm(video_path=bg_video_path, start_frm=bg_start_frm, end_frm=bg_end_frm, start_time=bg_start_time, end_time=bg_end_time) + bg_frm = cv2.resize(bg_frm, (video_meta_data['width'], video_meta_data['height'])) + cap = cv2.VideoCapture(video_path) + frm_cnt = 0 + while True: + ret, frm = cap.read() + if ret: + out_img = np.full_like(frm, fill_value=bg_color) + if not ret: + break + img_diff = np.abs(frm - bg_frm) + gray_diff = cv2.cvtColor(img_diff, cv2.COLOR_BGR2GRAY) + mask = np.where(gray_diff < threshold, 0, 1) + if fg_color is None: + out_img[mask == 1] = frm[mask == 1] + else: + out_img[mask == 1] = fg_color + writer.write(out_img) + frm_cnt += 1 + if verbose: + print(f'Background subtraction frame {frm_cnt}/{video_meta_data["frame_count"]} (Video: {video_name})') + else: + break + + writer.release() + cap.release() + timer.stop_timer() + if verbose: + stdout_success(msg=f'Background subtracted from {video_name} and saved at {save_path}', elapsed_time=timer.elapsed_time) + + + +video_bg_subtraction(video_path='/Users/simon/Desktop/envs/simba/troubleshooting/mitra/project_folder/videos/501_MA142_Gi_CNO_0514_clipped.mp4', + fg_color=(255, 0, 0), threshold=255) \ No newline at end of file diff --git a/simba/sandbox/bout_aggregator.py b/simba/sandbox/bout_aggregator.py new file mode 100644 index 000000000..2bf64a7ef --- /dev/null +++ b/simba/sandbox/bout_aggregator.py @@ -0,0 +1,63 @@ +import os +from copy import deepcopy +from typing import Literal, Optional, List, Union +try: + from typing import Literal +except: + from typing_extensions import Literal +import pandas as pd +from simba.utils.checks import check_valid_lst, check_int, check_str, check_valid_dataframe, check_instance +from simba.utils.read_write import find_core_cnt, read_video_info +from simba.utils.printing import SimbaTimer +from simba.utils.enums import Formats +from simba.utils.data import detect_bouts, read_df +from simba.utils.errors import InvalidInputError + +def video_bout_aggregator(data: Union[str, os.PathLike, pd.DataFrame], + clfs: List[str], + feature_names: List[str], + sample_rate: int, + min_bout_length: Optional[int] = None, + method: Optional[Literal["MEAN", "MEDIAN"]] = "MEAN") -> pd.DataFrame: + + check_valid_lst(data=clfs, source=f"{video_bout_aggregator.__name__} clfs", valid_dtypes=(str,), min_len=1) + check_valid_lst(data=feature_names, source=f"{video_bout_aggregator.__name__} feature_names", valid_dtypes=(str,), min_len=1) + check_instance(source=f'{video_bout_aggregator.__name__} data', accepted_types=(str, pd.DataFrame), instance=data) + if isinstance(data, (str, os.PathLike)): + df = read_df(file_path=data_path, file_type='csv', usecols=feature_names + clfs) + elif isinstance(data, (pd.DataFrame)): + df = deepcopy(data) + else: + raise InvalidInputError(msg=f'data is of invalid type: {type(df)}, accepted: {str, os.PathLike, pd.DataFrame}', source=video_bout_aggregator.__name__) + check_valid_dataframe(df=data, source=f"{video_bout_aggregator.__name__} data", valid_dtypes=Formats.NUMERIC_DTYPES.value, required_fields=feature_names + clfs) + check_int(name=f"{video_bout_aggregator.__name__} data", value=sample_rate, min_value=10e-6) + if min_bout_length is not None: + check_int(name=f"{video_bout_aggregator.__name__} min_bout_length", value=min_bout_length, min_value=0) + else: + min_bout_length = 0 + check_str(name=f"{video_bout_aggregator.__name__} method", value=method, options=("MEAN", "MEDIAN")) + + +# timer = SimbaTimer(start=True) + # core_cnt = find_core_cnt()[1] + # print("Calculating bout aggregate statistics...") + + # check_valid_dataframe(df=data, source=f"{video_bout_aggregator.__name__} data", required_fields=feature_names + clfs, valid_dtypes=Formats.NUMERIC_DTYPES.value) + # check_valid_dataframe(df=video_info, source=f"{video_bout_aggregator.__name__} video_info", required_fields=['fps', 'video'], valid_dtypes=Formats.NUMERIC_DTYPES.value) + # if min_bout_length is not None: + # check_int(name=f"{video_bout_aggregator.__name__} min_bout_length", value=min_bout_length, min_value=0) + # check_str(name=f"{video_bout_aggregator.__name__} aggregator", value=aggregator, options=("MEAN", "MEDIAN")) + # _, _, fps = read_video_info(vid_info_df=video_info, video_name=video) + # + # + # detect_bouts(data_df=data, target_lst=clfs) + # + # + # for cnt, video in enumerate(data["VIDEO"].unique()): + # print(f'Processing video {video} ({str(cnt+1)}/{str(len(data["VIDEO"].unique()))})...') + + + +data_path = '/Users/simon/Desktop/envs/simba/troubleshooting/mitra/project_folder/csv/input_csv/501_MA142_Gi_CNO_0521.csv' + +video_bout_aggregator(data=data_path) diff --git a/simba/sandbox/direction_reversals.py b/simba/sandbox/direction_reversals.py new file mode 100644 index 000000000..fd59217cc --- /dev/null +++ b/simba/sandbox/direction_reversals.py @@ -0,0 +1,24 @@ +import numpy as np + + + +def direction_switches(x: np.ndarray, switch_degree: int = 180): + + idx = 0 + cDeg = x[idx] + tDeg1, tDeg2 = ((cDeg + switch_degree) % 360 + 360) % 360 + + + print(cDeg) + print(tDeg1) + + + + + pass + + + + +x = np.random.randint(0, 361, (100)) +direction_switches(x=x) diff --git a/simba/sandbox/egocentric_align_nb.py b/simba/sandbox/egocentric_align_nb.py new file mode 100644 index 000000000..861fb48cc --- /dev/null +++ b/simba/sandbox/egocentric_align_nb.py @@ -0,0 +1,12 @@ +# In this notebook, we will "egocentrically" align pose estimation and pose-estimated video data. + +# This means that we will rotate the data, so that the animal, in every frame, is always "anchored" in the same location and directing to the same location. +# (i) One body-part (e.g., the center or the tail-base of the animal is always located in the same pixel location of the video. +# (ii) A second body-part (e.g., the nose, head, or nape) is always directing N degrees from the anchor point. + +# In short - we rotate the data so that the animal is always facing to the right, and the animal is always located at +# the center of the image. + + + + diff --git a/simba/sandbox/felzenszwalb.py b/simba/sandbox/felzenszwalb.py new file mode 100644 index 000000000..79ac1fbaf --- /dev/null +++ b/simba/sandbox/felzenszwalb.py @@ -0,0 +1,77 @@ +from typing import Union, Optional, List, Tuple +import os +import numpy as np +import cv2 + +from simba.utils.read_write import (get_video_meta_data, find_core_cnt, get_fn_ext, read_frm_of_video, concatenate_videos_in_folder) +#from simba.utils.checks import +import multiprocessing +import functools +from simba.utils.enums import Defaults, Formats +from skimage.color import label2rgb +from skimage.segmentation import felzenszwalb +from simba.utils.printing import SimbaTimer + +def _felzenszwalb_helper(frm_range: Tuple[int, List[int]], + scale: int, + min_size: int, + sigma: float, + save_dir: Union[str, os.PathLike], + video_path: Union[str, os.PathLike]): + + video_cap = cv2.VideoCapture(video_path) + video_meta_data = get_video_meta_data(video_path=video_path) + batch, start_frm, end_frm = frm_range[0], frm_range[1][0], frm_range[1][-1] + save_path = os.path.join(save_dir, f'{batch}.mp4') + fourcc = cv2.VideoWriter_fourcc(*Formats.MP4_CODEC.value) + writer = cv2.VideoWriter(save_path, fourcc, video_meta_data["fps"], (video_meta_data["width"], video_meta_data["height"])) + for frm_idx in range(start_frm, end_frm): + print(f'Frame {frm_idx}/{end_frm}, Batch {batch}...') + img = read_frm_of_video(video_path=video_cap, frame_index=frm_idx) + segments = felzenszwalb(img, scale=scale, sigma=sigma, min_size=min_size) + compressed_frame = label2rgb(segments, img, kind='avg') + compressed_frame_bgr = (compressed_frame * 255).astype(np.uint8) + writer.write(compressed_frame_bgr) + writer.release() + return batch + + #imgs = ImageMixin.read_img_batch_from_video(video_path=video_path, start_frm=start_frm, end_frm=end_frm, core_cnt=1) + + +def felzenszwalb_video(video_path: Union[str, os.PathLike], + save_path: Optional[Union[str, os.PathLike]] = None, + scale: Optional[int] = 100, + sigma: Optional[float] = 0.5, + min_size: Optional[int] = 100, + core_cnt: Optional[int] = -1): + + """ + :param Optional[int] scale: Path to the video file. + + :example: + >>> felzenszwalb_video(video_path='/Users/simon/Desktop/envs/simba/troubleshooting/mitra/project_folder/videos/704_MA115_Gi_CNO_0521_clipped.mp4', save_path='/Users/simon/Desktop/feltz/test.mp4') + """ + + timer = SimbaTimer(start=True) + video_meta_data = get_video_meta_data(video_path=video_path) + if core_cnt == -1: core_cnt = find_core_cnt()[0] + frm_ranges = np.array_split(np.arange(0, video_meta_data['frame_count']+1), core_cnt) + frm_ranges = [(y, x) for y, x in enumerate(frm_ranges)] + out_dir, out_name, _= get_fn_ext(filepath=save_path) + temp_folder = os.path.join(out_dir, "temp") + if not os.path.isdir(temp_folder): os.makedirs(temp_folder) + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + constants = functools.partial(_felzenszwalb_helper, + video_path=video_path, + save_dir=temp_folder, + scale=scale, + sigma=sigma, + min_size=min_size) + for cnt, core_batch in enumerate(pool.map(constants, frm_ranges, chunksize=1)): + print(f'Core batch {core_batch} complete...') + pass + timer.stop_timer() + concatenate_videos_in_folder(in_folder=temp_folder, save_path=save_path) + + +felzenszwalb_video(video_path='/Users/simon/Desktop/envs/simba/troubleshooting/mitra/project_folder/videos/704_MA115_Gi_CNO_0521_clipped.mp4', save_path='/Users/simon/Desktop/feltz/test.mp4') \ No newline at end of file diff --git a/simba/sandbox/gibbs_sampler.py b/simba/sandbox/gibbs_sampler.py new file mode 100644 index 000000000..f874ef171 --- /dev/null +++ b/simba/sandbox/gibbs_sampler.py @@ -0,0 +1,150 @@ +import os +from typing import Union, Tuple +import numpy as np +import pandas as pd + +from simba.utils.printing import SimbaTimer, stdout_success +from simba.utils.checks import check_valid_array, check_float, check_int, check_if_dir_exists + +class GibbSampler(): + def __init__(self, + data: np.ndarray, + save_path: Union[str, os.PathLike], + sequence_length: int = 4, + iterations: int = 1500, + epochs: int = 2, + stop_val: float = 0.001, + pseudo_number: float = 10e-6, + plateau_val: int = 50): + + """ + Gibbs sampling for finding "motifs" in categorical sequences. + + :param np.ndarray data: Two dimensional array where observations are organised by row and each sequential sample in the observation is organized by column. + :param Union[str, os.PathLike] save_path: The path location where to save the CSV results. + :param int sequence_length: The length of the motif sequence searched for. + :param int iterations: Number of iterations per epoch. Default: 1500. + :param int epochs: Number of epochs of iterations. Default: 4. + :param float stop_val: Terminate once the error value reaches below this threshold. Default 0.001. + :param int plateau_val: Terminate epoch when error rate has remained unchanged for ``plateau_val`` count of iterations. Default 50. + :param float pseudo_number: Small error value for fuzzy search and avoid division by zero errors. Default: 10e-6. + + :example: + >>> data = pd.read_csv(r"/Users/simon/Desktop/envs/simba/simba/tests/data/sample_data/gibbs_sample_cardinal.csv", index_col=0).values + >>> sampler = GibbSampler(data=data, save_path=r'/Users/simon/Desktop/gibbs.csv', epochs=5, iterations=600) + >>> sampler.run() + + :references: + .. [1] Lawrence et.al, Detecting Subtle Sequence Signals: a Gibbs Sampling Strategy for Multiple Alignment, Science, vol. 262, pp. 208-214, 1993. + .. [2] Great YouTube tutorial / explanation by Xiaole Shirley Liu - `https://www.youtube.com/watch?v=NRjhfyXWHuQ `_. + .. [3] Weinreb et al. Keypoint-MoSeq: parsing behavior by linking point tracking to pose dynamics, Nature Methods, 21, 1329–1339 (2024). + """ + + + check_valid_array(data=data, source=f'{self.__class__.__name__} data', accepted_ndims=(2,)) + check_float(name=f'{self.__class__.__name__} stop_val', value=stop_val, min_value=0.0) + check_float(name=f'{self.__class__.__name__} pseudo_number', value=pseudo_number, min_value=10e-16) + check_int(name=f'{self.__class__.__name__} epochs', value=epochs, min_value=1) + check_int(name=f'{self.__class__.__name__} iterations', value=iterations, min_value=1) + check_int(name=f'{self.__class__.__name__} sequence_length', value=sequence_length, min_value=2) + check_if_dir_exists(in_dir=os.path.dirname(save_path), source=f'{self.__class__.__name__} save_path') + self.unique_vals = np.sort(np.unique(data.flatten())) + self.target_probability = 1 * sequence_length + (pseudo_number * (sequence_length + 1)) + self.holdout_fields = [f"H_{i}" for i in range(sequence_length)] + self.non_holdout_cols = [f"C_{i}" for i in range(sequence_length)] + self.out_cols = [f"Behavior_{i+1}" for i in range(sequence_length)] + self.summary_df = pd.DataFrame(columns=self.out_cols) + self.data, self.pseudo_num, self.sequence_len, self.plateau_val = (data, pseudo_number, sequence_length, plateau_val) + self.iterations, self.stop_val, self.epochs = [iterations] * epochs, stop_val, epochs + self.save_path = save_path + + def __make_probability_table(self, data: np.ndarray) -> pd.DataFrame: + prob_df = pd.DataFrame(columns=self.unique_vals) + for field_idx in range(data.shape[1]): + idx_data = data[:, field_idx].flatten() + for unique_val in self.unique_vals: + val_idx = np.argwhere(idx_data == unique_val).flatten() + pct = (val_idx.shape[0] + self.pseudo_num) / (idx_data.shape[0] + self.pseudo_num) + prob_df.loc[field_idx, unique_val] = pct + return prob_df + + def __iterate_through_possible_seq_in_hold_out(self, + holdout_obs: np.ndarray, + bg_dict: dict, + probability_df: pd.DataFrame) -> pd.DataFrame: + + out_df = pd.DataFrame(columns=self.holdout_fields + ["weight"]) + for r in range(self.sequence_len, holdout_obs.shape[0]+1): + sequence = holdout_obs[r - self.sequence_len:r] + prob_tot = 1 + for i in range(self.sequence_len): + val = sequence[i] + prob_tot *= probability_df.loc[i][val] / bg_dict[val] + out_df.loc[len(out_df)] = np.append(sequence, prob_tot) + return out_df + + def __sum_results(self, full_sequence_set, summary_df) -> Tuple[pd.DataFrame, pd.DataFrame]: + full_sequence_set = pd.DataFrame(full_sequence_set, columns=self.out_cols) + summary_df = pd.concat([summary_df, full_sequence_set], axis=0).reset_index(drop=True) + output = (summary_df.groupby(summary_df.columns.tolist()).size().reset_index().rename(columns={0: "records"}).sort_values(by=["records"], ascending=False)) + output["percent"] = output["records"] / output["records"].sum() + return summary_df, output.reset_index(drop=True) + + def run(self): + timer = SimbaTimer(start=True) + unique_elements, counts = np.unique(self.data, return_counts=True) + counts_dict = dict(zip(unique_elements, counts)) + bg_dict = {k: (v / np.sum(counts)) for k, v in counts_dict.items()} + prior_error, stable_error_cnt, full_sequence, output = np.inf, 0, None, None + + for epoch_cnt, iterations in enumerate(self.iterations): + holdout_idx = np.random.randint(0, self.data.shape[0]-1) + for iteration in range(0, iterations): + hold_out_obs = self.data[holdout_idx] + if iteration == 0: + start_pos = np.random.randint(low=0, high=data.shape[1] - self.sequence_len, size=(data.shape[0])) + end_pos = np.array([x + self.sequence_len for x in start_pos]) + slices = np.array([[x, y] for x, y in zip(start_pos, end_pos)]) + epoch_sample = np.delete(self.data, [holdout_idx], axis=0) + sequences = np.full(shape=(epoch_sample.shape[0], self.sequence_len), fill_value='-1.0') + for idx in range(epoch_sample.shape[0]): + i = slices[idx] + sequences[idx] = epoch_sample[idx][i[0]: i[1]] + else: + sequences = np.delete(full_sequence, [holdout_idx], axis=0) + probability_df = self.__make_probability_table(sequences) + df_holdout_seqs = self.__iterate_through_possible_seq_in_hold_out(hold_out_obs, bg_dict, probability_df) + new_sequence = df_holdout_seqs.sample(1, weights="weight")[self.holdout_fields].values[0] + full_sequence = np.insert(sequences, 0, new_sequence, axis=0) + error = round(self.target_probability - probability_df.max(axis=1).sum(), 4) + print(f"Current error: {error}, iteration: {iteration+1}/{iterations}, epoch: {epoch_cnt+1}/{self.epochs}") + if error <= self.stop_val: + print(f"Convergence reached. Error: {error}, Stop value: {self.stop_val}") + break + if error == prior_error: + stable_error_cnt += 1 + if stable_error_cnt >= self.plateau_val - 1: + print(f"Plateau reached. Stable error count: {stable_error_cnt}. Plateau value: {self.plateau_val}") + break + else: + stable_error_cnt = 0 + prior_error = error + if holdout_idx < (len(self.data) - 1): + holdout_idx += 1 + else: + holdout_idx = 0 + self.summary_df, output = self.__sum_results(full_sequence, self.summary_df) + + output.to_csv(self.save_path) + timer.stop_timer() + stdout_success(msg=f"Gibbs sampling results saved @{self.save_path}", elapsed_time=timer.elapsed_time_str) + + + +# data = pd.read_csv(r"C:\projects\simba\simba\tests\data\sample_data\gibbs_sample_cardinal.csv", index_col=0).values +# sampler = GibbSampler(data=data, save_path=r'C:\Users\sroni\OneDrive\Desktop\gibbs.csv', iterations=5) +# sampler.run() + +# data = pd.read_csv(r"/Users/simon/Desktop/envs/simba/simba/tests/data/sample_data/gibbs_sample_cardinal.csv", index_col=0).values +# sampler = GibbSampler(data=data, save_path=r'/Users/simon/Desktop/gibbs.csv', epochs=5, iterations=600) +# sampler.run() \ No newline at end of file diff --git a/simba/sandbox/img_edge_distance_flexible.py b/simba/sandbox/img_edge_distance_flexible.py new file mode 100644 index 000000000..8244b103b --- /dev/null +++ b/simba/sandbox/img_edge_distance_flexible.py @@ -0,0 +1,43 @@ +import numpy as np + + + +def img_edge_distances_flexible(data: np.ndarray, + edge_distances: np.ndarray, + pixels_per_mm: float, + img_resolution: np.ndarray, + time_window: float, + fps: int) -> np.ndarray: + + """ + """ + + results = np.full((data.shape[0], 4), np.nan) + left = img_resolution[1] * edge_distances[0] + top = img_resolution[0] * edge_distances[1] + right = img_resolution[1] * edge_distances[2] + bottom = img_resolution[0] * edge_distances[3] + print(edge_distances[0]) + edge_distances = np.array([int(left), int(top), int(right), int(bottom)]) + window_size = int(time_window * fps) + for r in range(window_size, data.shape[0] + 1): + l = r - window_size + w_data = data[l:r].reshape(-1, 2) + w_distances = np.full((4, w_data.shape[0]), np.nan) + for idx in range(w_data.shape[0]): + w_distances[0, idx] = np.linalg.norm(w_data[idx] - edge_distances[0]) + w_distances[1, idx] = np.linalg.norm(w_data[idx] - edge_distances[1]) + w_distances[2, idx] = np.linalg.norm(w_data[idx] - edge_distances[2]) + w_distances[3, idx] = np.linalg.norm(w_data[idx] - edge_distances[3]) + for i in range(4): + results[r - 1][i] = np.mean(w_distances[i]) / pixels_per_mm + + return results.astype(np.float32) + +data = np.array([[0, 0], [758, 540], [0, 540], [748, 540]]) +img_edge_distances_flexible(data=data, + edge_distances=np.array([0.0, 0.0, 0.0, 0.0]), + pixels_per_mm=2.13, + img_resolution=np.array([748, 540]), + time_window=1.0, + fps=1) diff --git a/simba/sandbox/preferred_turning_direction.py b/simba/sandbox/preferred_turning_direction.py new file mode 100644 index 000000000..d6dea8709 --- /dev/null +++ b/simba/sandbox/preferred_turning_direction.py @@ -0,0 +1,41 @@ +import numpy as np +from simba.utils.checks import check_valid_array +from simba.utils.enums import Formats +from simba.utils.errors import InvalidInputError +from simba.mixins.circular_statistics import CircularStatisticsMixin +from simba.utils.data import get_mode + + +def preferred_turning_direction(x: np.ndarray) -> int: + """ + Determines the preferred turning direction from a 1D array of circular directional data. + + .. note:: + The input ``x`` can be created using any of the following methods: + - :func:`simba.mixins.circular_statistics.CircularStatisticsMixin.direction_two_bps` + - :func:`simba.data_processors.cuda.circular_statistics.direction_from_two_bps` + - :func:`simba.data_processors.cuda.circular_statistics.direction_from_three_bps` + - :func:`simba.mixins.circular_statistics.CircularStatisticsMixin.direction_three_bps` + + :param np.ndarray x: 1D array of circular directional data (values between 0 and 360, inclusive). The array represents angular directions measured in degrees. + :return: + The most frequent turning direction from the input data: + - `0`: No change in the angular value between consecutive frames. + - `1`: An increase in the angular value (rotation in the positive direction, counterclockwise). + - `2`: A decrease in the angular value (rotation in the negative direction, clockwise). + :rtype: int + + :example: + >>> x = np.random.randint(0, 361, (200,)) + >>> preferred_turning_direction(x=x) + """ + + check_valid_array(data=x, source=preferred_turning_direction.__name__, accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + if np.max(x) > 360 or np.min(x) < 0: + raise InvalidInputError(msg='x has to be values between 0 and 360 inclusive', source=preferred_turning_direction.__name__) + rotational_direction = CircularStatisticsMixin.rotational_direction(data=x.astype(np.float32)) + return get_mode(x=rotational_direction) + + + + diff --git a/simba/sandbox/radial_dispersion.py b/simba/sandbox/radial_dispersion.py new file mode 100644 index 000000000..f6318440e --- /dev/null +++ b/simba/sandbox/radial_dispersion.py @@ -0,0 +1,21 @@ +import numpy as np +from simba.mixins.statistics_mixin import Statistics +from simba.utils.checks import check_valid_array +from simba.utils.enums import Formats + +def radial_dispersion_index(x: np.ndarray, reference_point: np.ndarray) -> float: + """ + Compute the Radial Dispersion Index (RDI) for a set of points relative to a reference point. + + The RDI quantifies the variability in radial distances of points from the reference + point, normalized by the mean radial distance. + + :param np.ndarray x: 2-dimensional numpy array representing the input data with shape (n, m), where n is the number of frames and m is the coordinates. + :param np.ndarray reference_point: A 1D array of shape (n_dimensions,) representing the reference point with respect to which the radial dispertion index is calculated. + :rtype: float + """ + + check_valid_array(data=x, source=f"{radial_dispersion_index.__name__} x", accepted_ndims=(2,), accepted_axis_1_shape=(2,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=reference_point, source=f"{radial_dispersion_index.__name__} reference_point", accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + radial_distances = np.linalg.norm(x - reference_point, axis=1) + return np.std(radial_distances) / np.mean(radial_distances) diff --git a/simba/sandbox/radial_eccentricity.py b/simba/sandbox/radial_eccentricity.py new file mode 100644 index 000000000..68656904a --- /dev/null +++ b/simba/sandbox/radial_eccentricity.py @@ -0,0 +1,34 @@ +import numpy as np +from simba.mixins.statistics_mixin import Statistics +from simba.utils.checks import check_valid_array +from simba.utils.enums import Formats + +def radial_eccentricity(x: np.ndarray, reference_point: np.ndarray): + """ + Compute the radial eccentricity of a set of points relative to a reference point. + + Radial eccentricity quantifies the degree of elongation in the spatial distribution + of points. The value ranges between 0 and 1, where: - 0 indicates a perfectly circular distribution. - Values approaching 1 indicate a highly elongated or linear distribution. + + :param np.ndarray x: 2-dimensional numpy array representing the input data with shape (n, m), where n is the number of frames and m is the coordinates. + :param np.ndarray data: A 1D array of shape (n_dimensions,) representing the reference point with respect to which the radial eccentricity is calculated. + + :example: + >>> points = np.random.randint(0, 1000, (100000, 2)) + >>> reference_point = np.mean(points, axis=0) + >>> radial_eccentricity(x=points, reference_point=reference_point) + """ + + check_valid_array(data=x, source=f"{radial_eccentricity.__name__} x", accepted_ndims=(2,), accepted_axis_1_shape=(2,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=reference_point, source=f"{radial_eccentricity.__name__} reference_point", accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + centered_points = x - reference_point + cov_matrix = Statistics.cov_matrix(data=centered_points.astype(np.float32)) + eigenvalues, _ = np.linalg.eig(cov_matrix) + eigenvalues = np.sort(eigenvalues)[::-1] + return np.sqrt(1 - eigenvalues[1] / eigenvalues[0]) + + + + + + diff --git a/simba/sandbox/regression_errors.py b/simba/sandbox/regression_errors.py new file mode 100644 index 000000000..14c4f4d21 --- /dev/null +++ b/simba/sandbox/regression_errors.py @@ -0,0 +1,144 @@ +from typing import Optional +import numpy as np +from simba.utils.checks import check_valid_array, check_float +from simba.utils.enums import Formats + +def mean_absolute_percentage_error(y_true: np.ndarray, + y_pred: np.ndarray, + epsilon=1e-10, + weights: Optional[np.ndarray] = None) -> float: + """ + Compute the Mean Absolute Percentage Error (MAPE) + + :param np.ndarray y_true: The array containing the true values (dependent variable) of the dataset. Should be a 1D numeric array of shape (n,). + :param np.ndarray y_pred: The array containing the predicted values for the dataset. Should be a 1D numeric array of shape (n,) and of the same length as `y_true`. + :param float epsilon: A small pseudovalue to replace zeros in `y_true` to avoid division by zero errors. + :param Optional[np.ndarray] weights: An optional 1D array of weights to apply to each error. If provided, the weighted mean absolute percentage error is computed. + :return: The Mean Absolute Percentage Error (MAPE) as a float, in percentage format. A lower value indicates better prediction accuracy. + :rtype: float + + :example: + >>> x, y = np.random.random(size=(100000,)), np.random.random(size=(100000,)) + >>> mean_absolute_percentage_error(y_true=x, y_pred=y) + """ + + check_valid_array(data=y_true, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=y_pred, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0],accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_float(name=mean_absolute_percentage_error.__name__, value=epsilon) + y_true = np.where(y_true == 0, epsilon, y_true) + se = np.abs((y_true - y_pred) / y_true) + if weights is not None: + check_valid_array(data=weights, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + se = se * weights + return (np.sum(se) / np.sum(weights)) * 100 + else: + return np.mean(se * 100) +# +# #x, y = np.random.random(size=(10,)), np.random.random(size=(10,)) +# x, y = np.random.randint(0, 10, (10,)), np.random.randint(0, 10, (10,)) +# weights = np.random.random(size=(10,)) +# x = mean_absolute_percentage_error(y_true=x, y_pred=y, weights=weights) +# print(x) + +def mean_squared_error(y_true: np.ndarray, + y_pred: np.ndarray, + weights: Optional[np.ndarray] = None) -> float: + + """ + Compute the Mean Squared Error (MSE) between the true and predicted values. + + :param np.ndarray y_true: The array containing the true values (dependent variable) of the dataset. Should be a 1D numeric array of shape (n,). + :param np.ndarray y_pred: The array containing the predicted values for the dataset. Should be a 1D numeric array of shape (n,) and of the same length as `y_true`. + :param Optional[np.ndarray] weights: An optional 1D array of weights to apply to each squared error. If provided, the weighted mean squared error is computed. + :return: The Mean Squared Error (MSE) as a float. A lower value indicates better model accuracy. + :rtype: float + """ + + check_valid_array(data=y_true, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=y_pred, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0],accepted_dtypes=Formats.NUMERIC_DTYPES.value) + se = (y_true - y_pred) ** 2 + if weights is not None: + check_valid_array(data=weights, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + se = se * weights + return np.sum(se) / np.sum(weights) + else: + return np.mean(se) + +def mean_absolute_error(y_true: np.ndarray, + y_pred: np.ndarray, + weights: Optional[np.ndarray] = None) -> float: + + check_valid_array(data=y_true, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=y_pred, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + absolute_error = np.abs(y_true - y_pred) + if weights is not None: + check_valid_array(data=weights, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + absolute_error = absolute_error * weights + return np.sum(absolute_error) / np.sum(weights) + else: + return np.mean(absolute_error) + + +def r2_score(y_true: np.ndarray, y_pred: np.ndarray, weights: Optional[np.ndarray] = None) -> float: + """ + Compute the R^2 (coefficient of determination) score. + + :param np.ndarray y_true: 1D array of true values (dependent variable). + :param np.ndarray y_pred: 1D array of predicted values, same length as `y_true`. + :param np.ndarray weights: Optional 1D array of weights for each observation. + :return: The R^2 score as a float. A value closer to 1 indicates better fit. + :rtype: float + + """ + + check_valid_array(data=y_true, source=r2_score.__name__, accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=y_pred, source=r2_score.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + + if weights is not None: + check_valid_array(data=weights, source=r2_score.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0]) + + y_mean = np.average(y_true, weights=weights) if weights is not None else np.mean(y_true) + residuals, total = (y_true - y_pred) ** 2, (y_true - y_mean) ** 2 + + if weights is not None: + ss_residual = np.sum(residuals * weights) + ss_total = np.sum(total * weights) + else: + ss_residual = np.sum(residuals) + ss_total = np.sum(total) + + return 1 - (ss_residual / ss_total) + + +def root_mean_squared_error(y_true: np.ndarray, + y_pred: np.ndarray, + weights: Optional[np.ndarray] = None) -> float: + + """ + Compute the Root Mean Squared Error (RMSE) between the true and predicted values. + + :param np.ndarray y_true: The array containing the true values (dependent variable) of the dataset. Should be a 1D numeric array of shape (n,). + :param np.ndarray y_pred: The array containing the predicted values for the dataset. Should be a 1D numeric array of shape (n,) and of the same length as `y_true`. + :param Optional[np.ndarray] weights: An optional 1D array of weights to apply to each squared error. If provided, the weighted mean squared error is computed. + :return: The Root Mean Squared Error (MSE) as a float. A lower value indicates better model accuracy. + :rtype: float + """ + + check_valid_array(data=y_true, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_valid_array(data=y_pred, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0],accepted_dtypes=Formats.NUMERIC_DTYPES.value) + se = (y_true - y_pred) ** 2 + if weights is not None: + check_valid_array(data=weights, source=mean_absolute_percentage_error.__name__, accepted_ndims=(1,), min_axis_0=y_true.shape[0], accepted_dtypes=Formats.NUMERIC_DTYPES.value) + weighted_mse = np.sum(se * weights) / np.sum(weights) + return np.sqrt(weighted_mse) + else: + return np.sqrt(np.mean(se)) + + + +#x, y = np.random.random(size=(10,)), np.random.random(size=(10,)) +#x, y = np.random.randint(0, 10, (10,)), np.random.randint(0, 10, (10,)) +# weights = np.random.random(size=(10,)) +# my_results = mean_squared_error(y_true=x, y_pred=y, weights=weights) +# sklearn_mse(y_true=x, y_pred=y, sample_weight=weights) + diff --git a/simba/sandbox/sliding_descriptive_statistics_cuda.py b/simba/sandbox/sliding_descriptive_statistics_cuda.py new file mode 100644 index 000000000..f4453a7f8 --- /dev/null +++ b/simba/sandbox/sliding_descriptive_statistics_cuda.py @@ -0,0 +1,50 @@ +import numpy as np +from numba import cuda +from typing import Tuple +try: + from typing import Literal +except: + from typing_extensions import Literal +from simba.utils.checks import check_valid_array, check_valid_tuple, check_float +from simba.utils.enums import Formats +from simba.data_processors.cuda.utils import _cuda_mean +import math + +def _cuda_variance(x: np.ndarray): + #mean = _cuda_mean(x=x) + mean = np.mean(x) + num = 0 + for i in range(x.shape[0]): + num += abs(x[i] - mean) + return num / (x.shape[0] - 1) + + + + + +def sliding_descriptive_statistics_cuda(data: np.ndarray, + window_size: float, + sample_rate: float, + statistics: Tuple[Literal["var", "max", "min", "std"]]): + + STATISTICS = ('var', 'max', 'min', 'std', 'median', 'mean', 'mad', 'sum', 'mac', 'rms', 'abs_energy') + check_valid_array(data=data, source=f'{sliding_descriptive_statistics_cuda.__name__} data', accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + check_float(name=f'{sliding_descriptive_statistics_cuda.__name__} window_size', value=window_size, min_value=10e-6) + check_float(name=f'{sliding_descriptive_statistics_cuda.__name__} sample_rate', value=sample_rate, min_value=10e-6) + check_valid_tuple(x=statistics, source=f'{sliding_descriptive_statistics_cuda.__name__} statistics', valid_dtypes=(str,)) #TODO: ACCESPTED ENTRIES + frm_win = np.array([max(1, int(window_size*sample_rate))]) + sV = np.zeros(shape=(11,), dtype=np.uint8) + for cnt, statistic in enumerate(STATISTICS): + if statistic in statistics: sV[cnt] = 1 + print(sV) + + + + +data = np.random.randint(0, 500, (100,)) +window_size = 1.5 +sliding_descriptive_statistics_cuda(data=data, window_size=window_size, sample_rate=10, statistics=('max', 'min')) + + +x = np.array([2.5, 5, 7.5, 10.0]) +_cuda_variance(x=x) diff --git a/simba/sandbox/sliding_mode.py b/simba/sandbox/sliding_mode.py new file mode 100644 index 000000000..9740e29a6 --- /dev/null +++ b/simba/sandbox/sliding_mode.py @@ -0,0 +1,7 @@ +import numpy as np +from numba import jit + + +def sliding_mode(x: np.ndarray, window_size: float, sample_rate: float): + window_frm_size = np.max((1.0, window_size*sample_rate)) + diff --git a/simba/sandbox/sliding_preferred_turning_direction.py b/simba/sandbox/sliding_preferred_turning_direction.py new file mode 100644 index 000000000..4ca9ab777 --- /dev/null +++ b/simba/sandbox/sliding_preferred_turning_direction.py @@ -0,0 +1,50 @@ +import numpy as np +from simba.utils.checks import check_valid_array, check_float +from simba.utils.enums import Formats +from simba.utils.errors import InvalidInputError +from simba.mixins.circular_statistics import CircularStatisticsMixin +from simba.utils.data import get_mode + + +def sliding_preferred_turning_direction(x: np.ndarray, + time_window: float, + sample_rate: float) -> np.ndarray: + """ + Computes the sliding preferred turning direction over a given time window from a 1D array of circular directional data. + + Calculates the most frequent turning direction (mode) within a sliding window of a specified duration. + + :param np.ndarray x: A 1D array of circular directional data (values between 0 and 360, inclusive). Each value represents an angular direction in degrees. + :param float time_window: The duration of the sliding window in seconds. + :param float sample_rate: The sampling rate of the data in Hz (samples per second) or FPS (frames per seconds) + :return: + A 1D array of integers indicating the preferred turning direction for each window: + - `0`: No change in angular values within the window. + - `1`: An increase in angular values (counterclockwise rotation). + - `2`: A decrease in angular values (clockwise rotation). + For indices before the first full window, the value is `-1`. + :rtype: np.ndarray + + :example: + >>> x = np.random.randint(0, 361, (213,)) + >>> sliding_preferred_turning_direction(x=x, time_window=1, sample_rate=10) + """ + check_valid_array(data=x, source=sliding_preferred_turning_direction.__name__, accepted_ndims=(1,), accepted_dtypes=Formats.NUMERIC_DTYPES.value) + if np.max(x) > 360 or np.min(x) < 0: + raise InvalidInputError(msg='x has to be values between 0 and 360 inclusive', source=sliding_preferred_turning_direction.__name__) + check_float(name=f'{sliding_preferred_turning_direction} time_window', value=time_window) + check_float(name=f'{sliding_preferred_turning_direction} sample_rate', value=sample_rate) + rotational_directions = CircularStatisticsMixin.rotational_direction(data=x.astype(np.float32)) + window_size = np.int64(np.max((1.0, (time_window*sample_rate)))) + results = np.full(shape=(x.shape[0]), fill_value=-1, dtype=np.int32) + for r in range(window_size, x.shape[0]+1): + l = r-window_size + sample = rotational_directions[l:r] + results[r-1] = get_mode(x=sample) + return results.astype(np.int32) + + + #return + +# x = np.random.randint(0, 361, (213,)) +# sliding_preferred_turning_direction(x=x, time_window=1, sample_rate=10) \ No newline at end of file diff --git a/simba/sandbox/snap.py b/simba/sandbox/snap.py new file mode 100644 index 000000000..a1d75d26f --- /dev/null +++ b/simba/sandbox/snap.py @@ -0,0 +1,53 @@ +import numpy as np +from simba.utils.read_write import read_df + +def snap(x: np.ndarray, sample_rate: float, pixels_per_mm: float): + """ + :example: + >>> x = read_df(file_path=r'/Users/simon/Desktop/envs/simba/troubleshooting/mitra/project_folder/csv/outlier_corrected_movement_location/FRR_gq_CNO_0625.csv', file_type='csv', usecols=['Nose_x', 'Nose_y']).values + >>> snap(x=x, sample_rate=30, pixels_per_mm=2.15) + """ + dt = 1 / sample_rate + velocity = np.gradient(x, axis=0) / dt + acceleration = np.gradient(velocity, axis=0) / dt + jerk = np.gradient(acceleration, axis=0) / dt + snap = np.gradient(jerk, axis=0) / dt + snap_magnitude = np.linalg.norm(snap, axis=1) / pixels_per_mm + return snap_magnitude + + +x = read_df(file_path=r'/Users/simon/Desktop/envs/simba/troubleshooting/mitra/project_folder/csv/outlier_corrected_movement_location/FRR_gq_CNO_0625.csv', file_type='csv', usecols=['Nose_x', 'Nose_y']).values +snap(x=x, sample_rate=30, pixels_per_mm=2.15) + +# +# x = np.random.randint(0, 500, (200, 2)) +# sample_rate = 10 +# pixels_per_mm = 3.4 +# snap(x=x, sample_rate=sample_rate, pixels_per_mm=pixels_per_mm) +# time = np.linspace(0, 10, 100) + + + + + + + +# # Example data +# time = np.linspace(0, 10, 100) # Time array (N,) +# x = np.sin(time) # Example x positions +# y = np.cos(time) # Example y positions +# +# # Combine into a 2D array of shape (N, 2) +# positions = np.column_stack((x, y)) +# +# # Compute derivatives +# velocity = np.gradient(positions, axis=0, edge_order=2) / np.gradient(time) # First derivative +# acceleration = np.gradient(velocity, axis=0, edge_order=2) / np.gradient(time) # Second derivative +# jerk = np.gradient(acceleration, axis=0, edge_order=2) / np.gradient(time) # Third derivative +# snap = np.gradient(jerk, axis=0, edge_order=2) / np.gradient(time) # Fourth derivative +# +# # Print some results +# print("Velocity (first 5):", velocity[:5]) +# print("Acceleration (first 5):", acceleration[:5]) +# print("Jerk (first 5):", jerk[:5]) +# print("Snap (first 5):", snap[:5]) diff --git a/simba/sandbox/spatial_matrix.py b/simba/sandbox/spatial_matrix.py new file mode 100644 index 000000000..a3dca7730 --- /dev/null +++ b/simba/sandbox/spatial_matrix.py @@ -0,0 +1,65 @@ +import numpy as np +import pysal +from pysal.explore import esda + + +def create_adjacency_matrix(N, M): + + def idx(r, c, M): + return r * M + c + adj_matrix = np.zeros((N * M, N * M)) + + for r in range(N): + for c in range(M): + current = idx(r, c, M) + if r > 0: + adj_matrix[current, idx(r - 1, c, M)] = 1 + if r < N - 1: + adj_matrix[current, idx(r + 1, c, M)] = 1 + if c > 0: + adj_matrix[current, idx(r, c - 1, M)] = 1 + if c < M - 1: + adj_matrix[current, idx(r, c + 1, M)] = 1 + if r > 0 and c > 0: + adj_matrix[current, idx(r - 1, c - 1, M)] = 1 + if r > 0 and c < M - 1: + adj_matrix[current, idx(r - 1, c + 1, M)] = 1 + if r < N - 1 and c > 0: + adj_matrix[current, idx(r + 1, c - 1, M)] = 1 + if r < N - 1 and c < M - 1: + adj_matrix[current, idx(r + 1, c + 1, M)] = 1 + + return adj_matrix + + +def morans_i(values, adj_matrix): + """Calculate Moran's I for spatial autocorrelation.""" + n = len(values) + mean_value = np.mean(values) # Mean of the values + num = 0 + denom = 0 + W = np.sum(adj_matrix) # Sum of weights in the adjacency matrix + + for i in range(n): + for j in range(n): + if adj_matrix[i, j] == 1: # If i and j are neighbors + num += (values[i] - mean_value) * (values[j] - mean_value) + denom += (values[i] - mean_value) ** 2 + + I = (n / W) * (num / denom) # Moran's I + return I + +# Example grid size (e.g., 3x3 grid) +N, M = 3, 3 + +# Example 2D grid values for each cell in the grid +values = np.array([[1, 2, 3], + [4, 5, 6], + [7, 8, 9]]) + +# Flatten the 2D grid into 1D array for Moran's I computation +flattened_values = values.flatten() + +adj_matrix = create_adjacency_matrix(N, M) + +I = morans_i(flattened_values, adj_matrix) \ No newline at end of file diff --git a/simba/sandbox/unsupervised/cluster_frequentist_stats.py b/simba/sandbox/unsupervised/cluster_frequentist_stats.py index 84e75a976..df7014a29 100644 --- a/simba/sandbox/unsupervised/cluster_frequentist_stats.py +++ b/simba/sandbox/unsupervised/cluster_frequentist_stats.py @@ -1,33 +1,35 @@ import itertools import os -from copy import deepcopy from typing import Union import numpy as np import pandas as pd -from scipy.stats import f_oneway, kruskal -from statsmodels.stats.libqsturng import psturng -from statsmodels.stats.multicomp import pairwise_tukeyhsd - -from simba.mixins.train_model_mixin import TrainModelMixin +from simba.mixins.statistics_mixin import Statistics from simba.utils.checks import (check_file_exist_and_readable, check_if_keys_exist_in_dict, - check_valid_boolean) + check_valid_boolean, check_if_dir_exists) from simba.utils.enums import UML from simba.utils.errors import InvalidInputError -from simba.utils.printing import SimbaTimer -from simba.utils.read_write import (find_files_of_filetypes_in_directory, - get_unique_values_in_iterable, read_pickle) +from simba.utils.printing import SimbaTimer, stdout_success +from simba.utils.read_write import (find_files_of_filetypes_in_directory, get_unique_values_in_iterable, read_pickle) class ClusterFrequentistCalculator(): """ Class for computing frequentist statitics based on cluster assignment labels for explainability purposes. - :param Union[str, os.PathLike] data_path: path to pickle or directory of pickles holding unsupervised results in ``simba.unsupervised.data_map.yaml`` format. - :param dict settings: Dict holding which statistical tests to use, with test name as keys and booleans as values. + :param Union[str, os.PathLike] data_path: path to pickle or directory of pickles holding unsupervised results in ``simba.unsupervised.data_map.yaml`` format. Can be greated by ... + :param Union[str, os.PathLike] save_path: Location wehere to store the results in MS Excel format. + :param bool scaled: If True, uses the scaled data (used to fit the model). Else, uses the raw feature bout values. + :param bool anova: If True, computes one-way ANOVAs. + :param bool kruskal_wallis: If True, computes Kruskal-Wallis comparisons. + :param bool tukey: If True, computes Tukey-HSD post-hoc cluster comparisons. + :param bool descriptive: If True, computes descriptive statistics for each feature value in each cluster (mean, sem, stdev, min, max). + :param bool pairwise: If True, computes one-way ANOVAs and Kruskal-Wallis comparisons where each cluster is compared to each of the other clusters. Else computes one against all others. :example: + >>> calculator = ClusterFrequentistCalculator(data_path=r"/Users/simon/Downloads/academic_elgamal.pickle", scaled=False, save_path=r"/Users/simon/Desktop/test.xlsx", pairwise=True, tukey=False) + >>> calculator.run() """ def __init__(self, data_path: Union[str, os.PathLike], @@ -39,8 +41,9 @@ def __init__(self, descriptive: bool = True, pairwise: bool = True): - + self.timer = SimbaTimer(start=True) check_valid_boolean(value=[scaled, anova, tukey, descriptive, pairwise], source=self.__class__.__name__) + check_if_dir_exists(in_dir=os.path.dirname(save_path), source=self.__class__.__name__) if len(list({scaled, anova, tukey, descriptive, kruskal_wallis})) == 1 and not list({scaled, anova, tukey, descriptive, kruskal_wallis})[0]: raise InvalidInputError(msg='No statistical tests chosen', source=self.__class__.__name__) if os.path.isdir(data_path): @@ -49,6 +52,7 @@ def __init__(self, check_file_exist_and_readable(file_path=data_path) self.data_paths = [data_path] self.scaled, self.anova, self.tukey, self.descriptive, self.pairwise, self.kruskal = scaled, anova, tukey, descriptive, pairwise, kruskal_wallis + self.save_path = save_path with pd.ExcelWriter(save_path, mode="w") as writer: pd.DataFrame().to_excel(writer, sheet_name=" ", index=True) @@ -63,63 +67,47 @@ def run(self): else: features = data[UML.DATA.value][UML.SCALED_TRAIN_DATA.value].reset_index(drop=True) lbls = data[UML.CLUSTER_MODEL.value][UML.MODEL.value].labels_ - cluster_cnt = get_unique_values_in_iterable(data=lbls, name=name, min=2) + _ = get_unique_values_in_iterable(data=lbls, name=name, min=2) unique_cluster_lbls = np.unique(lbls) - if self.pairwise: - cluster_comb = list(itertools.combinations(unique_cluster_lbls, 2)) - cluster_comb = [(x[0], (x[1],)) for x in cluster_comb] - else: - cluster_comb= [(x, tuple(y for y in unique_cluster_lbls if y != x)) for x in unique_cluster_lbls] - anova_results, kruskal_results, tukey_results = [], [], [] - for target, base in cluster_comb: - target_X = features.loc[np.argwhere(lbls == target).flatten()].values - non_target_X = features.loc[np.argwhere(np.isin(lbls, base)).flatten()].values + if self.anova or self.kruskal: + if self.pairwise: + cluster_comb = list(itertools.combinations(unique_cluster_lbls, 2)) + cluster_comb = [(x[0], (x[1],)) for x in cluster_comb] + else: + cluster_comb= [(x, tuple(y for y in unique_cluster_lbls if y != x)) for x in unique_cluster_lbls] + self.anova_results, self.kruskal_results = [], [] + for target, nontargets in cluster_comb: + target_X = features.loc[np.argwhere(lbls == target).flatten()].values + non_target_X = features.loc[np.argwhere(np.isin(lbls, nontargets)).flatten()].values + if self.anova: + anova_df = Statistics.one_way_anova_scipy(x=target_X, y=non_target_X, variable_names=list(features.columns), x_name=str(target), y_name=str(nontargets)) + self.anova_results.append(anova_df) + if self.kruskal: + kruskal_df = Statistics.kruskal_scipy(x=target_X, y=non_target_X, variable_names=list(features.columns), x_name=str(target), y_name=str(nontargets)) + self.kruskal_results.append(kruskal_df) if self.anova: - anova_df = pd.DataFrame(features.columns, columns=['FEATURE']) - anova_df[['GROUP_1', 'GROUP_2']] = target, str(base) - anova_df['F-STATISTIC'], anova_df['P-VALUE'] = f_oneway(target_X, non_target_X) - anova_results.append(anova_df) + self.__save_results(df=pd.concat(self.anova_results, axis=0).reset_index(drop=True).sort_values(['GROUP_1', 'GROUP_2', 'P-VALUE']), name='ANOVA') if self.kruskal: - kruskal_df = pd.DataFrame(features.columns, columns=['FEATURE']) - kruskal_df[['GROUP_1', 'GROUP_2']] = target, str(base) - kruskal_df['STATISTIC'], kruskal_df['P-VALUE'] = kruskal(target_X, non_target_X) - kruskal_results.append(kruskal_df) + self.__save_results(df=pd.concat(self.kruskal_results, axis=0).reset_index(drop=True).sort_values(['GROUP_1', 'GROUP_2', 'P-VALUE']), name='KRUSKAL WALLIS') if self.tukey: - tukey_df = deepcopy(features) - tukey_df['lbls'] = lbls - for x in features.columns: - data = pairwise_tukeyhsd(tukey_df[x], tukey_df['lbls']) - df = pd.DataFrame(data=data._results_table.data[1:], columns=data._results_table.data[0]) - df['P-VALUE'] = psturng(np.abs(data.meandiffs / data.std_pairs), len(data.groupsunique), data.df_total) - df['FEATURE'] = x - tukey_results.append(tukey_df) + tukey_results = Statistics.pairwise_tukeyhsd_scipy(data=features.values, group=lbls, variable_names=list(features.columns), verbose=True) + self.__save_results(df=tukey_results, name='TUKEY_HSD') if self.descriptive: - for cluster_id in unique_cluster_lbls: - target_X = features.loc[np.argwhere(lbls == target).flatten()].values - - - - - #print() - #anova_df['F-STATISTIC', 'P-VALUE'] = f_vals, p_vals - - - - - #= pair - - - # - # - # if self.settings[ANOVA]: - # self.__one_way_anovas() - - - - - - -calculator = ClusterFrequentistCalculator(data_path=r"C:\troubleshooting\nastacia_unsupervised\cluster_data\dreamy_moser.pickle", scaled=True, save_path=r"C:\troubleshooting\nastacia_unsupervised\cluster_statistics\stats.xlsx", pairwise=False) -calculator.run() + def sem(x): + return np.std(x, ddof=1) / np.sqrt(len(x)) + features['GROUP'] = lbls + numeric_columns = [col for col in features.select_dtypes(include='number').columns if col != 'GROUP'] + descriptive_result = features.groupby('GROUP').agg({col: ['mean', sem, 'std', 'min', 'max'] for col in numeric_columns}) + descriptive_result.columns = [f'{col[0]}_{col[1]}' if col[1] != '' or '' else f'{col[0]}_sem' for col in descriptive_result.columns] + self.__save_results(df=descriptive_result, name='DESCRIPTIVE_STATISTICS') + self.timer.stop_timer() + stdout_success(msg=f'Cluster descriptive statistics saved at {self.save_path}', elapsed_time=self.timer.elapsed_time_str) + + def __save_results(self, df: pd.DataFrame, name: str): + with pd.ExcelWriter(self.save_path, mode="a") as writer: + df.to_excel(writer, sheet_name=name, index=True) + +# calculator = ClusterFrequentistCalculator(data_path=r"/Users/simon/Downloads/academic_elgamal.pickle", scaled=False, save_path=r"/Users/simon/Desktop/test.xlsx", pairwise=True, tukey=False) +# calculator.run() diff --git a/simba/sandbox/unsupervised/cluster_validation_metrics.py b/simba/sandbox/unsupervised/cluster_validation_metrics.py new file mode 100644 index 000000000..d8f331564 --- /dev/null +++ b/simba/sandbox/unsupervised/cluster_validation_metrics.py @@ -0,0 +1,128 @@ +import os +from itertools import combinations, product +from typing import List, Union + +import pandas as pd + +try: + from typing import Literal +except: + from typing_extensions import Literal + +from simba.mixins.config_reader import ConfigReader +from simba.mixins.statistics_mixin import Statistics +from simba.unsupervised.enums import Clustering, Unsupervised +from simba.utils.checks import (check_file_exist_and_readable, + check_if_dir_exists, + check_if_keys_exist_in_dict, check_valid_lst) +from simba.utils.enums import Formats +from simba.utils.errors import CountError +from simba.utils.printing import stdout_success +from simba.utils.read_write import (find_files_of_filetypes_in_directory, + get_fn_ext, read_pickle) + +ADJ_MUTUAL_INFO = "adjusted mutual information" +FOWLKES_MALLOWS = "fowlkes mallows" +ADJ_RAND_INDEX = "adjusted rand index" +STATS_OPTIONS = [ADJ_MUTUAL_INFO, FOWLKES_MALLOWS, ADJ_RAND_INDEX] + + +class ClusterValidationMetricCalculator(): + """ + """ + + def __init__(self, + data_path: Union[str, os.PathLike], + save_path: Union[str, os.PathLike], + ami: bool = True, + + + + statistics: List[Literal[ADJ_MUTUAL_INFO, FOWLKES_MALLOWS, ADJ_RAND_INDEX]]): + + ConfigReader.__init__( + self, config_path=config_path, read_video_info=False, create_logger=False + ) + check_if_dir_exists(in_dir=data_dir) + check_file_exist_and_readable(file_path=config_path) + self.data_paths = find_files_of_filetypes_in_directory( + directory=data_dir, extensions=[f".{Formats.PICKLE.value}"] + ) + if len(self.data_paths) < 2: + raise CountError( + msg=f"Cluster comparisons require at least two models. Found {len(self.data_paths)} in {data_dir}", + source=self.__class__.__name__, + ) + check_valid_lst( + data=statistics, + source=self.__class__.__name__, + valid_dtypes=(str,), + valid_values=STATS_OPTIONS, + ) + self.statistics, self.config_path, self.data_dir = ( + statistics, + config_path, + data_dir, + ) + self.save_path = os.path.join( + self.logs_path, + f"clusterer_comparisons_{self.datetime}.{Formats.XLXS.value}", + ) + with pd.ExcelWriter(self.save_path, mode="w") as writer: + pd.DataFrame().to_excel(writer, sheet_name=" ", index=True) + + def __save_results(self, df: pd.DataFrame, name: str): + with pd.ExcelWriter(self.save_path, mode="a") as writer: + df.to_excel(writer, sheet_name=name, index=True) + + def run(self): + self.data, obs_cnts = {}, {} + for file_cnt, file_path in enumerate(self.data_paths): + self.data_ = read_pickle(data_path=file_path, verbose=True) + check_if_keys_exist_in_dict( + data=self.data_, + key=[Unsupervised.METHODS.value, Clustering.CLUSTER_MODEL.value], + name=file_path, + ) + mdl_name = self.data_[Clustering.CLUSTER_MODEL.value][ + Unsupervised.HASHED_NAME.value + ] + cluster_data = self.data_[Clustering.CLUSTER_MODEL.value][ + Unsupervised.MODEL.value + ].labels_ + self.data[mdl_name] = cluster_data + obs_cnts[mdl_name] = cluster_data.shape[0] + + counts = list(set([v for k, v in obs_cnts.items()])) + if len(counts) > 1: + raise CountError( + msg=f"Cluster comparisons require models built using the data. Found different number of observations in the different mdoels in {self.data_dir}. {obs_cnts}", + source=self.__class__.__name__, + ) + + for statistic in self.statistics: + results = {} + for i in self.data.keys(): + print(f"Computing {statistic} statistics for {i}...") + results[i] = {} + for j in self.data.keys(): + x, y = self.data[i], self.data[j] + if statistic == ADJ_RAND_INDEX: + s = Statistics.adjusted_rand(x=x, y=y) + elif statistic == ADJ_MUTUAL_INFO: + s = Statistics.adjusted_mutual_info(x=x, y=y) + else: + s = Statistics.fowlkes_mallows(x=x, y=y) + results[i][j] = s + df = pd.DataFrame.from_dict(results, orient="index") + self.__save_results(df=df, name=statistic) + + self.timer.stop_timer() + stdout_success( + f"Cluster comparisons saved at {self.save_path}", + elapsed_time=self.timer.elapsed_time_str, + ) + + +# x = ClustererComparison(config_path="/Users/simon/Desktop/envs/NG_Unsupervised/project_folder/project_config.ini", data_dir='/Users/simon/Desktop/envs/NG_Unsupervised/project_folder/clusters', statistics=STATS_OPTIONS) +# x.run()