From eec978a7825d06b4ea1cf7bca53e5f3b36a51868 Mon Sep 17 00:00:00 2001 From: Olivier Winter Date: Thu, 14 Sep 2023 12:06:17 +0100 Subject: [PATCH] Copy (#503) * WIP recover from partial copy multiple devices * copier hierarchy * Update transfer_experiments.py * Update transfer_experiments.py * modify acquisition description aggregation level from copiers * add the script to copy video data / purge sessions older than 2 weeks * Fix chained protocols bug * bugfix: the criteria to end the task should not factor in the delay period --------- Co-authored-by: Florian Rau --- iblrig/base_tasks.py | 6 +-- iblrig/commands.py | 42 +++++++++++------ .../cameras/body_left_right.yaml | 21 ++++----- iblrig/device_descriptions/cameras/left.yaml | 9 ++-- .../microphone/microphone.yaml | 7 +-- .../neuropixel/dual_probe.yaml | 15 ++++--- .../neuropixel/single_probe.yaml | 9 ++-- iblrig/online_plots.py | 3 +- iblrig/path_helper.py | 14 ++++++ iblrig/test/test_transfers.py | 25 +++++++---- iblrig/transfer_experiments.py | 45 +++++++++++++++---- 11 files changed, 133 insertions(+), 63 deletions(-) diff --git a/iblrig/base_tasks.py b/iblrig/base_tasks.py index f3f15be6c..5816f3bf8 100644 --- a/iblrig/base_tasks.py +++ b/iblrig/base_tasks.py @@ -38,7 +38,7 @@ import iblrig.graphic as graph from iblrig.version_management import check_for_updates import ibllib.io.session_params as ses_params -from iblrig.transfer_experiments import SessionCopier +from iblrig.transfer_experiments import BehaviorCopier OSC_CLIENT_IP = "127.0.0.1" @@ -372,8 +372,8 @@ def create_session(self): logfile = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_ibl_log.info-acquisition.log') self._setup_loggers(level=self.logger.level, file=logfile) # copy the acquisition stub to the remote session folder - sc = SessionCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER']) - sc.initialize_experiment(self.experiment_description) + sc = BehaviorCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER']) + sc.initialize_experiment(self.experiment_description, overwrite=False) self.register_to_alyx() def run(self): diff --git a/iblrig/commands.py b/iblrig/commands.py index a9e2b11f2..d94a869e6 100644 --- a/iblrig/commands.py +++ b/iblrig/commands.py @@ -7,33 +7,46 @@ from iblutil.util import setup_logger from ibllib.io import raw_data_loaders -from iblrig.transfer_experiments import SessionCopier +from iblrig.transfer_experiments import BehaviorCopier, VideoCopier import iblrig from iblrig.hardware import Bpod -from iblrig.path_helper import load_settings_yaml +from iblrig.path_helper import load_settings_yaml, get_local_and_remote_paths from iblrig.online_plots import OnlinePlots from iblrig.raw_data_loaders import load_task_jsonable logger = setup_logger('iblrig', level='INFO') +def transfer_video_data(local_subjects_path=None, remote_subjects_path=None, dry=False): + local_subjects_path, remote_subjects_path = get_local_and_remote_paths( + local_subjects_path=local_subjects_path, remote_subjects_path=remote_subjects_path) + + for flag in list(local_subjects_path.rglob('transfer_me.flag')): + session_path = flag.parent + vc = VideoCopier(session_path, remote_subjects_folder=remote_subjects_path) + logger.critical(f"{vc.state}, {vc.session_path}") + if not dry: + vc.run() + remove_local_sessions(weeks=2, local_subjects_path=local_subjects_path, + remote_subjects_path=remote_subjects_path, dry=dry, tag='video') + + def transfer_data(local_subjects_path=None, remote_subjects_path=None, dry=False): """ - Copies the data from the rig to the local server if the session has more than 42 trials + Copies the behavior data from the rig to the local server if the session has more than 42 trials If the hardware settings file contains MAIN_SYNC=True, the number of expected devices is set to 1 :param weeks: :param dry: :return: """ - iblrig_settings = load_settings_yaml() + local_subjects_path, remote_subjects_path = get_local_and_remote_paths( + local_subjects_path=local_subjects_path, remote_subjects_path=remote_subjects_path) hardware_settings = load_settings_yaml('hardware_settings.yaml') - local_subjects_path = local_subjects_path or Path(iblrig_settings['iblrig_local_data_path']).joinpath('Subjects') - remote_subjects_path = remote_subjects_path or Path(iblrig_settings['iblrig_remote_data_path']).joinpath('Subjects') number_of_expected_devices = 1 if hardware_settings.get('MAIN_SYNC', True) else None for flag in list(local_subjects_path.rglob('transfer_me.flag')): session_path = flag.parent - sc = SessionCopier(session_path, remote_subjects_folder=remote_subjects_path) + sc = BehaviorCopier(session_path, remote_subjects_folder=remote_subjects_path) task_settings = raw_data_loaders.load_settings(session_path, task_collection='raw_task_data_00') if task_settings is None: logger.info(f'skipping: no task settings found for {session_path}') @@ -76,24 +89,25 @@ def transfer_data(local_subjects_path=None, remote_subjects_path=None, dry=False remove_local_sessions(weeks=2, dry=dry, local_subjects_path=local_subjects_path, remote_subjects_path=remote_subjects_path) -def remove_local_sessions(weeks=2, local_subjects_path=None, remote_subjects_path=None, dry=False): +def remove_local_sessions(weeks=2, local_subjects_path=None, remote_subjects_path=None, dry=False, tag='behavior'): """ Remove local sessions older than 2 weeks :param weeks: :param dry: :return: """ - iblrig_settings = load_settings_yaml() - local_subjects_path = local_subjects_path or Path(iblrig_settings['iblrig_local_data_path']) - remote_subjects_path = remote_subjects_path or Path(iblrig_settings['iblrig_remote_data_path']).joinpath('Subjects') - + local_subjects_path, remote_subjects_path = get_local_and_remote_paths( + local_subjects_path=local_subjects_path, remote_subjects_path=remote_subjects_path) size = 0 - for flag in sorted(list(local_subjects_path.rglob('_ibl_experiment.description_behavior.yaml')), reverse=True): + match tag: + case 'behavior': Copier = BehaviorCopier + case 'video': Copier = VideoCopier + for flag in sorted(list(local_subjects_path.rglob(f'_ibl_experiment.description_{tag}.yaml')), reverse=True): session_path = flag.parent days_elapsed = (datetime.datetime.now() - datetime.datetime.strptime(session_path.parts[-2], '%Y-%m-%d')).days if days_elapsed < (weeks * 7): continue - sc = SessionCopier(session_path, remote_subjects_folder=remote_subjects_path) + sc = Copier(session_path, remote_subjects_folder=remote_subjects_path) if sc.state == 3: session_size = sum(f.stat().st_size for f in session_path.rglob('*') if f.is_file()) / 1024 ** 3 logger.info(f"{sc.session_path}, {session_size:0.02f} Go") diff --git a/iblrig/device_descriptions/cameras/body_left_right.yaml b/iblrig/device_descriptions/cameras/body_left_right.yaml index 7a510cfac..b6a497033 100644 --- a/iblrig/device_descriptions/cameras/body_left_right.yaml +++ b/iblrig/device_descriptions/cameras/body_left_right.yaml @@ -1,10 +1,11 @@ -cameras: - body: - collection: raw_video_data - sync_label: frame2ttl - left: - collection: raw_video_data - sync_label: frame2ttl - right: - collection: raw_video_data - sync_label: frame2ttl +devices: + cameras: + body: + collection: raw_video_data + sync_label: frame2ttl + left: + collection: raw_video_data + sync_label: frame2ttl + right: + collection: raw_video_data + sync_label: frame2ttl diff --git a/iblrig/device_descriptions/cameras/left.yaml b/iblrig/device_descriptions/cameras/left.yaml index ec1e4718f..6132ffe71 100644 --- a/iblrig/device_descriptions/cameras/left.yaml +++ b/iblrig/device_descriptions/cameras/left.yaml @@ -1,4 +1,5 @@ -cameras: - left: - collection: raw_video_data - sync_label: frame2ttl +devices: + cameras: + left: + collection: raw_video_data + sync_label: frame2ttl \ No newline at end of file diff --git a/iblrig/device_descriptions/microphone/microphone.yaml b/iblrig/device_descriptions/microphone/microphone.yaml index b8d2e4928..bb85bb80b 100644 --- a/iblrig/device_descriptions/microphone/microphone.yaml +++ b/iblrig/device_descriptions/microphone/microphone.yaml @@ -1,4 +1,5 @@ -microphone: +devices: microphone: - collection: raw_behavior_data - sync_label: null + microphone: + collection: raw_behavior_data + sync_label: null \ No newline at end of file diff --git a/iblrig/device_descriptions/neuropixel/dual_probe.yaml b/iblrig/device_descriptions/neuropixel/dual_probe.yaml index ac687013b..7f6d6111c 100644 --- a/iblrig/device_descriptions/neuropixel/dual_probe.yaml +++ b/iblrig/device_descriptions/neuropixel/dual_probe.yaml @@ -1,7 +1,8 @@ -neuropixel: - probe00: - collection: raw_ephys_data/probe00 - sync_label: imec_sync - probe01: - collection: raw_ephys_data/probe01 - sync_label: imec_sync +devices: + neuropixel: + probe00: + collection: raw_ephys_data/probe00 + sync_label: imec_sync + probe01: + collection: raw_ephys_data/probe01 + sync_label: imec_sync \ No newline at end of file diff --git a/iblrig/device_descriptions/neuropixel/single_probe.yaml b/iblrig/device_descriptions/neuropixel/single_probe.yaml index aef18e4e9..4c14ba6d2 100644 --- a/iblrig/device_descriptions/neuropixel/single_probe.yaml +++ b/iblrig/device_descriptions/neuropixel/single_probe.yaml @@ -1,4 +1,5 @@ -neuropixel: - probe00: - collection: raw_ephys_data/probe00 - sync_label: imec_sync +devices: + neuropixel: + probe00: + collection: raw_ephys_data/probe00 + sync_label: imec_sync \ No newline at end of file diff --git a/iblrig/online_plots.py b/iblrig/online_plots.py index f89f44782..008d8fa03 100644 --- a/iblrig/online_plots.py +++ b/iblrig/online_plots.py @@ -70,7 +70,8 @@ def __init__(self, task_file): self.ntrials_engaged = 0 # those are the trials happening within the first 400s else: trials_table, bpod_data = load_task_jsonable(task_file) - self.time_elapsed = bpod_data[-1]['Trial end timestamp'] - bpod_data[-1]['Bpod start timestamp'] + # here we take the end time of the first trial as reference to avoid factoring in the delay + self.time_elapsed = bpod_data[-1]['Trial end timestamp'] - bpod_data[0]['Trial end timestamp'] trials_table['signed_contrast'] = np.sign(trials_table['position']) * trials_table['contrast'] trials_table['choice'] = trials_table['position'] > 0 trials_table.loc[~trials_table.trial_correct, 'choice'] = ~trials_table['choice'][~trials_table.trial_correct] diff --git a/iblrig/path_helper.py b/iblrig/path_helper.py index 2bdc7e6ec..b990e6094 100644 --- a/iblrig/path_helper.py +++ b/iblrig/path_helper.py @@ -16,6 +16,20 @@ log = logging.getLogger("iblrig") +def get_local_and_remote_paths(local_subjects_path=None, remote_subjects_path=None): + """ + Function used to parse input arguments to transfer commands. If the arguments are None, reads in the settings + and returns the values from the files, otherwise + :param local_subjects_path: + :param remote_subjects_path: + :return: + """ + iblrig_settings = load_settings_yaml() + local_subjects_path = local_subjects_path or Path(iblrig_settings['iblrig_local_data_path']) + remote_subjects_path = remote_subjects_path or Path(iblrig_settings['iblrig_remote_data_path']).joinpath('Subjects') + return local_subjects_path, remote_subjects_path + + def load_settings_yaml(file_name='iblrig_settings.yaml', mode='raise'): """ Load a yaml file from the settings folder. diff --git a/iblrig/test/test_transfers.py b/iblrig/test/test_transfers.py index b11df69bf..ee21cb13e 100644 --- a/iblrig/test/test_transfers.py +++ b/iblrig/test/test_transfers.py @@ -4,9 +4,11 @@ import tempfile import unittest +from ibllib.io import session_params + from iblrig_tasks._iblrig_tasks_trainingChoiceWorld.task import Session from iblrig.test.base import TASK_KWARGS -from iblrig.transfer_experiments import SessionCopier, VideoCopier, EphysCopier +from iblrig.transfer_experiments import BehaviorCopier, VideoCopier, EphysCopier import iblrig.commands import iblrig.raw_data_loaders @@ -57,8 +59,8 @@ def test_behavior_copy_complete_session(self): session.paths.SESSION_FOLDER.joinpath('transfer_me.flag').touch() iblrig.commands.transfer_data(local_subjects_path=session.paths.LOCAL_SUBJECT_FOLDER, remote_subjects_path=session.paths.REMOTE_SUBJECT_FOLDER) - sc = SessionCopier(session_path=session.paths.SESSION_FOLDER, - remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER) + sc = BehaviorCopier(session_path=session.paths.SESSION_FOLDER, + remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER) self.assertEqual(sc.state, 3) def test_behavior_do_not_copy_dummy_sessions(self): @@ -75,7 +77,7 @@ def test_behavior_do_not_copy_dummy_sessions(self): local_subjects_path=session.paths.LOCAL_SUBJECT_FOLDER, remote_subjects_path=session.paths.REMOTE_SUBJECT_FOLDER ) - sc = SessionCopier( + sc = BehaviorCopier( session_path=session.paths.SESSION_FOLDER, remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER) self.assertFalse(sc.remote_session_path.exists()) @@ -83,14 +85,14 @@ def test_behavior_do_not_copy_dummy_sessions(self): class TestUnitTransferExperiments(unittest.TestCase): """ - UnitTest the SessionCopier, VideoCopier and EphysCopier classes and methods + UnitTest the BehaviorCopier, VideoCopier and EphysCopier classes and methods Unlike the integration test, the sessions here are made from scratch using an actual instantiated session """ def test_behavior_copy(self): with tempfile.TemporaryDirectory() as td: session = _create_behavior_session(td) - sc = SessionCopier(session_path=session.paths.SESSION_FOLDER, - remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER) + sc = BehaviorCopier(session_path=session.paths.SESSION_FOLDER, + remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER) assert sc.state == 1 sc.copy_collections() assert sc.state == 2 @@ -141,8 +143,8 @@ def test_behavior_ephys_video_copy(self): """ Test the copiers """ - sc = SessionCopier(session_path=session.paths.SESSION_FOLDER, - remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER) + sc = BehaviorCopier(session_path=session.paths.SESSION_FOLDER, + remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER) assert sc.glob_file_remote_copy_status().suffix == '.status_pending' assert sc.state == 1 sc.copy_collections() @@ -154,6 +156,7 @@ def test_behavior_ephys_video_copy(self): assert sc.state == 2 # here we still don't have all devides so this won't cut it and we stay in state 2 vc = VideoCopier(session_path=folder_session_video, remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER) + vc.create_video_stub() assert vc.state == 0 vc.initialize_experiment() assert vc.state == 1 @@ -176,3 +179,7 @@ def test_behavior_ephys_video_copy(self): # this time it's all there and we move on sc.finalize_copy(number_of_expected_devices=3) assert sc.state == 3 + final_experiment_description = session_params.read_params(sc.remote_session_path) + assert len(final_experiment_description['tasks']) == 1 + assert set(final_experiment_description['devices']['cameras'].keys()) == set(['body', 'left', 'right']) + assert set(final_experiment_description['sync'].keys()) == set(['nidq']) diff --git a/iblrig/transfer_experiments.py b/iblrig/transfer_experiments.py index 88f50f7ee..4a9dd1e89 100644 --- a/iblrig/transfer_experiments.py +++ b/iblrig/transfer_experiments.py @@ -1,3 +1,4 @@ +import abc from pathlib import Path import shutil import traceback @@ -10,9 +11,9 @@ log = setup_logger('iblrig', level='INFO') -class SessionCopier(): - tag = 'behavior' +class SessionCopier(abc.ABC): assert_connect_on_init = False + _experiment_description = None def __init__(self, session_path, remote_subjects_folder=None, tag=None): self.tag = tag or self.tag @@ -36,10 +37,10 @@ def run(self, number_of_expected_devices=None): if self.state == -1: # this case is not implemented automatically and corresponds to a hard reset log.info(f"{self.state}, {self.session_path}") shutil.rmtree(self.remote_session_path) - self.initialize_experiment(session_params.read_params(self.session_path)) + self.initialize_experiment() if self.state == 0: # the session hasn't even been initialzed: copy the stub to the remote log.info(f"{self.state}, {self.session_path}") - self.initialize_experiment(session_params.read_params(self.session_path)) + self.initialize_experiment() if self.state == 1: # the session log.info(f"{self.state}, {self.session_path}") self.copy_collections() @@ -76,7 +77,7 @@ def get_state(self): @property def experiment_description(self): - return session_params.read_params(self.session_path) + return self._experiment_description @property def remote_session_path(self): @@ -151,7 +152,7 @@ def copy_collections(self): self.session_path.joinpath('transfer_me.flag').unlink() return status - def initialize_experiment(self, acquisition_description=None, overwrite=False): + def initialize_experiment(self, acquisition_description=None, overwrite=True): """ Copy acquisition description yaml to the server and local transfers folder. @@ -162,6 +163,9 @@ def initialize_experiment(self, acquisition_description=None, overwrite=False): overwrite : bool If true, overwrite any existing file with the new one, otherwise, update the existing file. """ + if acquisition_description is None: + acquisition_description = self.experiment_description + assert acquisition_description # First attempt to add the remote description stub to the _device folder on the remote session @@ -174,6 +178,8 @@ def initialize_experiment(self, acquisition_description=None, overwrite=False): try: merged_description = session_params.merge_params(previous_description, acquisition_description) session_params.write_yaml(remote_stub_file, merged_description) + for f in remote_stub_file.parent.glob(remote_stub_file.stem + '.status_*'): + f.unlink() remote_stub_file.with_suffix('.status_pending').touch() log.info(f'Written data to remote device at: {remote_stub_file}.') except Exception as e: @@ -219,13 +225,35 @@ class VideoCopier(SessionCopier): tag = 'video' assert_connect_on_init = True + def create_video_stub(self, nvideos=None): + match len(list(self.session_path.joinpath('raw_video_data').glob('*.avi'))): + case 3: + stub_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'cameras', + 'body_left_right.yaml') + case 1: + stub_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'cameras', 'left.yaml') + acquisition_description = session_params.read_params(stub_file) + session_params.write_params(self.session_path, acquisition_description) + def initialize_experiment(self, acquisition_description=None, **kwargs): if not acquisition_description: - stub_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'cameras', 'body_left_right.yaml') - acquisition_description = session_params.read_params(stub_file) + # creates the acquisition description stub if not found, and then read it + if not self.file_experiment_description.exists(): + self.create_video_stub() + acquisition_description = session_params.read_params(self.file_experiment_description) + self._experiment_description = acquisition_description super(VideoCopier, self).initialize_experiment(acquisition_description=acquisition_description, **kwargs) +class BehaviorCopier(SessionCopier): + tag = 'behavior' + assert_connect_on_init = False + + @property + def experiment_description(self): + return session_params.read_params(self.session_path) + + class EphysCopier(SessionCopier): tag = 'spikeglx' assert_connect_on_init = True @@ -240,6 +268,7 @@ def initialize_experiment(self, acquisition_description=None, nprobes=None, **kw sync_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'sync', 'nidq.yaml') acquisition_description = session_params.read_params(stub_file) acquisition_description.update(session_params.read_params(sync_file)) + self._experiment_description = acquisition_description super(EphysCopier, self).initialize_experiment(acquisition_description=acquisition_description, **kwargs) def _copy_collections(self):