Skip to content

Commit

Permalink
Copy (#503)
Browse files Browse the repository at this point in the history
* WIP recover from partial copy multiple devices

* copier hierarchy

* Update transfer_experiments.py

* Update transfer_experiments.py

* modify acquisition description aggregation level from copiers

* add the script to copy video data / purge sessions older than 2 weeks

* Fix chained protocols bug

* bugfix: the criteria to end the task should not factor in the delay period

---------

Co-authored-by: Florian Rau <[email protected]>
  • Loading branch information
oliche and bimac committed Sep 15, 2023
1 parent a51211b commit eec978a
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 63 deletions.
6 changes: 3 additions & 3 deletions iblrig/base_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import iblrig.graphic as graph
from iblrig.version_management import check_for_updates
import ibllib.io.session_params as ses_params
from iblrig.transfer_experiments import SessionCopier
from iblrig.transfer_experiments import BehaviorCopier

OSC_CLIENT_IP = "127.0.0.1"

Expand Down Expand Up @@ -372,8 +372,8 @@ def create_session(self):
logfile = self.paths.SESSION_RAW_DATA_FOLDER.joinpath('_ibl_log.info-acquisition.log')
self._setup_loggers(level=self.logger.level, file=logfile)
# copy the acquisition stub to the remote session folder
sc = SessionCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER'])
sc.initialize_experiment(self.experiment_description)
sc = BehaviorCopier(self.paths.SESSION_FOLDER, remote_subjects_folder=self.paths['REMOTE_SUBJECT_FOLDER'])
sc.initialize_experiment(self.experiment_description, overwrite=False)
self.register_to_alyx()

def run(self):
Expand Down
42 changes: 28 additions & 14 deletions iblrig/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,46 @@

from iblutil.util import setup_logger
from ibllib.io import raw_data_loaders
from iblrig.transfer_experiments import SessionCopier
from iblrig.transfer_experiments import BehaviorCopier, VideoCopier
import iblrig
from iblrig.hardware import Bpod
from iblrig.path_helper import load_settings_yaml
from iblrig.path_helper import load_settings_yaml, get_local_and_remote_paths
from iblrig.online_plots import OnlinePlots
from iblrig.raw_data_loaders import load_task_jsonable

logger = setup_logger('iblrig', level='INFO')


def transfer_video_data(local_subjects_path=None, remote_subjects_path=None, dry=False):
local_subjects_path, remote_subjects_path = get_local_and_remote_paths(
local_subjects_path=local_subjects_path, remote_subjects_path=remote_subjects_path)

for flag in list(local_subjects_path.rglob('transfer_me.flag')):
session_path = flag.parent
vc = VideoCopier(session_path, remote_subjects_folder=remote_subjects_path)
logger.critical(f"{vc.state}, {vc.session_path}")
if not dry:
vc.run()
remove_local_sessions(weeks=2, local_subjects_path=local_subjects_path,
remote_subjects_path=remote_subjects_path, dry=dry, tag='video')


def transfer_data(local_subjects_path=None, remote_subjects_path=None, dry=False):
"""
Copies the data from the rig to the local server if the session has more than 42 trials
Copies the behavior data from the rig to the local server if the session has more than 42 trials
If the hardware settings file contains MAIN_SYNC=True, the number of expected devices is set to 1
:param weeks:
:param dry:
:return:
"""
iblrig_settings = load_settings_yaml()
local_subjects_path, remote_subjects_path = get_local_and_remote_paths(
local_subjects_path=local_subjects_path, remote_subjects_path=remote_subjects_path)
hardware_settings = load_settings_yaml('hardware_settings.yaml')
local_subjects_path = local_subjects_path or Path(iblrig_settings['iblrig_local_data_path']).joinpath('Subjects')
remote_subjects_path = remote_subjects_path or Path(iblrig_settings['iblrig_remote_data_path']).joinpath('Subjects')
number_of_expected_devices = 1 if hardware_settings.get('MAIN_SYNC', True) else None

for flag in list(local_subjects_path.rglob('transfer_me.flag')):
session_path = flag.parent
sc = SessionCopier(session_path, remote_subjects_folder=remote_subjects_path)
sc = BehaviorCopier(session_path, remote_subjects_folder=remote_subjects_path)
task_settings = raw_data_loaders.load_settings(session_path, task_collection='raw_task_data_00')
if task_settings is None:
logger.info(f'skipping: no task settings found for {session_path}')
Expand Down Expand Up @@ -76,24 +89,25 @@ def transfer_data(local_subjects_path=None, remote_subjects_path=None, dry=False
remove_local_sessions(weeks=2, dry=dry, local_subjects_path=local_subjects_path, remote_subjects_path=remote_subjects_path)


def remove_local_sessions(weeks=2, local_subjects_path=None, remote_subjects_path=None, dry=False):
def remove_local_sessions(weeks=2, local_subjects_path=None, remote_subjects_path=None, dry=False, tag='behavior'):
"""
Remove local sessions older than 2 weeks
:param weeks:
:param dry:
:return:
"""
iblrig_settings = load_settings_yaml()
local_subjects_path = local_subjects_path or Path(iblrig_settings['iblrig_local_data_path'])
remote_subjects_path = remote_subjects_path or Path(iblrig_settings['iblrig_remote_data_path']).joinpath('Subjects')

local_subjects_path, remote_subjects_path = get_local_and_remote_paths(
local_subjects_path=local_subjects_path, remote_subjects_path=remote_subjects_path)
size = 0
for flag in sorted(list(local_subjects_path.rglob('_ibl_experiment.description_behavior.yaml')), reverse=True):
match tag:
case 'behavior': Copier = BehaviorCopier
case 'video': Copier = VideoCopier
for flag in sorted(list(local_subjects_path.rglob(f'_ibl_experiment.description_{tag}.yaml')), reverse=True):
session_path = flag.parent
days_elapsed = (datetime.datetime.now() - datetime.datetime.strptime(session_path.parts[-2], '%Y-%m-%d')).days
if days_elapsed < (weeks * 7):
continue
sc = SessionCopier(session_path, remote_subjects_folder=remote_subjects_path)
sc = Copier(session_path, remote_subjects_folder=remote_subjects_path)
if sc.state == 3:
session_size = sum(f.stat().st_size for f in session_path.rglob('*') if f.is_file()) / 1024 ** 3
logger.info(f"{sc.session_path}, {session_size:0.02f} Go")
Expand Down
21 changes: 11 additions & 10 deletions iblrig/device_descriptions/cameras/body_left_right.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
cameras:
body:
collection: raw_video_data
sync_label: frame2ttl
left:
collection: raw_video_data
sync_label: frame2ttl
right:
collection: raw_video_data
sync_label: frame2ttl
devices:
cameras:
body:
collection: raw_video_data
sync_label: frame2ttl
left:
collection: raw_video_data
sync_label: frame2ttl
right:
collection: raw_video_data
sync_label: frame2ttl
9 changes: 5 additions & 4 deletions iblrig/device_descriptions/cameras/left.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cameras:
left:
collection: raw_video_data
sync_label: frame2ttl
devices:
cameras:
left:
collection: raw_video_data
sync_label: frame2ttl
7 changes: 4 additions & 3 deletions iblrig/device_descriptions/microphone/microphone.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
microphone:
devices:
microphone:
collection: raw_behavior_data
sync_label: null
microphone:
collection: raw_behavior_data
sync_label: null
15 changes: 8 additions & 7 deletions iblrig/device_descriptions/neuropixel/dual_probe.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
neuropixel:
probe00:
collection: raw_ephys_data/probe00
sync_label: imec_sync
probe01:
collection: raw_ephys_data/probe01
sync_label: imec_sync
devices:
neuropixel:
probe00:
collection: raw_ephys_data/probe00
sync_label: imec_sync
probe01:
collection: raw_ephys_data/probe01
sync_label: imec_sync
9 changes: 5 additions & 4 deletions iblrig/device_descriptions/neuropixel/single_probe.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
neuropixel:
probe00:
collection: raw_ephys_data/probe00
sync_label: imec_sync
devices:
neuropixel:
probe00:
collection: raw_ephys_data/probe00
sync_label: imec_sync
3 changes: 2 additions & 1 deletion iblrig/online_plots.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def __init__(self, task_file):
self.ntrials_engaged = 0 # those are the trials happening within the first 400s
else:
trials_table, bpod_data = load_task_jsonable(task_file)
self.time_elapsed = bpod_data[-1]['Trial end timestamp'] - bpod_data[-1]['Bpod start timestamp']
# here we take the end time of the first trial as reference to avoid factoring in the delay
self.time_elapsed = bpod_data[-1]['Trial end timestamp'] - bpod_data[0]['Trial end timestamp']
trials_table['signed_contrast'] = np.sign(trials_table['position']) * trials_table['contrast']
trials_table['choice'] = trials_table['position'] > 0
trials_table.loc[~trials_table.trial_correct, 'choice'] = ~trials_table['choice'][~trials_table.trial_correct]
Expand Down
14 changes: 14 additions & 0 deletions iblrig/path_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@
log = logging.getLogger("iblrig")


def get_local_and_remote_paths(local_subjects_path=None, remote_subjects_path=None):
"""
Function used to parse input arguments to transfer commands. If the arguments are None, reads in the settings
and returns the values from the files, otherwise
:param local_subjects_path:
:param remote_subjects_path:
:return:
"""
iblrig_settings = load_settings_yaml()
local_subjects_path = local_subjects_path or Path(iblrig_settings['iblrig_local_data_path'])
remote_subjects_path = remote_subjects_path or Path(iblrig_settings['iblrig_remote_data_path']).joinpath('Subjects')
return local_subjects_path, remote_subjects_path


def load_settings_yaml(file_name='iblrig_settings.yaml', mode='raise'):
"""
Load a yaml file from the settings folder.
Expand Down
25 changes: 16 additions & 9 deletions iblrig/test/test_transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import tempfile
import unittest

from ibllib.io import session_params

from iblrig_tasks._iblrig_tasks_trainingChoiceWorld.task import Session
from iblrig.test.base import TASK_KWARGS
from iblrig.transfer_experiments import SessionCopier, VideoCopier, EphysCopier
from iblrig.transfer_experiments import BehaviorCopier, VideoCopier, EphysCopier
import iblrig.commands
import iblrig.raw_data_loaders

Expand Down Expand Up @@ -57,8 +59,8 @@ def test_behavior_copy_complete_session(self):
session.paths.SESSION_FOLDER.joinpath('transfer_me.flag').touch()
iblrig.commands.transfer_data(local_subjects_path=session.paths.LOCAL_SUBJECT_FOLDER,
remote_subjects_path=session.paths.REMOTE_SUBJECT_FOLDER)
sc = SessionCopier(session_path=session.paths.SESSION_FOLDER,
remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER)
sc = BehaviorCopier(session_path=session.paths.SESSION_FOLDER,
remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER)
self.assertEqual(sc.state, 3)

def test_behavior_do_not_copy_dummy_sessions(self):
Expand All @@ -75,22 +77,22 @@ def test_behavior_do_not_copy_dummy_sessions(self):
local_subjects_path=session.paths.LOCAL_SUBJECT_FOLDER,
remote_subjects_path=session.paths.REMOTE_SUBJECT_FOLDER
)
sc = SessionCopier(
sc = BehaviorCopier(
session_path=session.paths.SESSION_FOLDER,
remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER)
self.assertFalse(sc.remote_session_path.exists())


class TestUnitTransferExperiments(unittest.TestCase):
"""
UnitTest the SessionCopier, VideoCopier and EphysCopier classes and methods
UnitTest the BehaviorCopier, VideoCopier and EphysCopier classes and methods
Unlike the integration test, the sessions here are made from scratch using an actual instantiated session
"""
def test_behavior_copy(self):
with tempfile.TemporaryDirectory() as td:
session = _create_behavior_session(td)
sc = SessionCopier(session_path=session.paths.SESSION_FOLDER,
remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER)
sc = BehaviorCopier(session_path=session.paths.SESSION_FOLDER,
remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER)
assert sc.state == 1
sc.copy_collections()
assert sc.state == 2
Expand Down Expand Up @@ -141,8 +143,8 @@ def test_behavior_ephys_video_copy(self):
"""
Test the copiers
"""
sc = SessionCopier(session_path=session.paths.SESSION_FOLDER,
remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER)
sc = BehaviorCopier(session_path=session.paths.SESSION_FOLDER,
remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER)
assert sc.glob_file_remote_copy_status().suffix == '.status_pending'
assert sc.state == 1
sc.copy_collections()
Expand All @@ -154,6 +156,7 @@ def test_behavior_ephys_video_copy(self):
assert sc.state == 2 # here we still don't have all devides so this won't cut it and we stay in state 2

vc = VideoCopier(session_path=folder_session_video, remote_subjects_folder=session.paths.REMOTE_SUBJECT_FOLDER)
vc.create_video_stub()
assert vc.state == 0
vc.initialize_experiment()
assert vc.state == 1
Expand All @@ -176,3 +179,7 @@ def test_behavior_ephys_video_copy(self):
# this time it's all there and we move on
sc.finalize_copy(number_of_expected_devices=3)
assert sc.state == 3
final_experiment_description = session_params.read_params(sc.remote_session_path)
assert len(final_experiment_description['tasks']) == 1
assert set(final_experiment_description['devices']['cameras'].keys()) == set(['body', 'left', 'right'])
assert set(final_experiment_description['sync'].keys()) == set(['nidq'])
45 changes: 37 additions & 8 deletions iblrig/transfer_experiments.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import abc
from pathlib import Path
import shutil
import traceback
Expand All @@ -10,9 +11,9 @@
log = setup_logger('iblrig', level='INFO')


class SessionCopier():
tag = 'behavior'
class SessionCopier(abc.ABC):
assert_connect_on_init = False
_experiment_description = None

def __init__(self, session_path, remote_subjects_folder=None, tag=None):
self.tag = tag or self.tag
Expand All @@ -36,10 +37,10 @@ def run(self, number_of_expected_devices=None):
if self.state == -1: # this case is not implemented automatically and corresponds to a hard reset
log.info(f"{self.state}, {self.session_path}")
shutil.rmtree(self.remote_session_path)
self.initialize_experiment(session_params.read_params(self.session_path))
self.initialize_experiment()
if self.state == 0: # the session hasn't even been initialzed: copy the stub to the remote
log.info(f"{self.state}, {self.session_path}")
self.initialize_experiment(session_params.read_params(self.session_path))
self.initialize_experiment()
if self.state == 1: # the session
log.info(f"{self.state}, {self.session_path}")
self.copy_collections()
Expand Down Expand Up @@ -76,7 +77,7 @@ def get_state(self):

@property
def experiment_description(self):
return session_params.read_params(self.session_path)
return self._experiment_description

@property
def remote_session_path(self):
Expand Down Expand Up @@ -151,7 +152,7 @@ def copy_collections(self):
self.session_path.joinpath('transfer_me.flag').unlink()
return status

def initialize_experiment(self, acquisition_description=None, overwrite=False):
def initialize_experiment(self, acquisition_description=None, overwrite=True):
"""
Copy acquisition description yaml to the server and local transfers folder.
Expand All @@ -162,6 +163,9 @@ def initialize_experiment(self, acquisition_description=None, overwrite=False):
overwrite : bool
If true, overwrite any existing file with the new one, otherwise, update the existing file.
"""
if acquisition_description is None:
acquisition_description = self.experiment_description

assert acquisition_description

# First attempt to add the remote description stub to the _device folder on the remote session
Expand All @@ -174,6 +178,8 @@ def initialize_experiment(self, acquisition_description=None, overwrite=False):
try:
merged_description = session_params.merge_params(previous_description, acquisition_description)
session_params.write_yaml(remote_stub_file, merged_description)
for f in remote_stub_file.parent.glob(remote_stub_file.stem + '.status_*'):
f.unlink()
remote_stub_file.with_suffix('.status_pending').touch()
log.info(f'Written data to remote device at: {remote_stub_file}.')
except Exception as e:
Expand Down Expand Up @@ -219,13 +225,35 @@ class VideoCopier(SessionCopier):
tag = 'video'
assert_connect_on_init = True

def create_video_stub(self, nvideos=None):
match len(list(self.session_path.joinpath('raw_video_data').glob('*.avi'))):
case 3:
stub_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'cameras',
'body_left_right.yaml')
case 1:
stub_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'cameras', 'left.yaml')
acquisition_description = session_params.read_params(stub_file)
session_params.write_params(self.session_path, acquisition_description)

def initialize_experiment(self, acquisition_description=None, **kwargs):
if not acquisition_description:
stub_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'cameras', 'body_left_right.yaml')
acquisition_description = session_params.read_params(stub_file)
# creates the acquisition description stub if not found, and then read it
if not self.file_experiment_description.exists():
self.create_video_stub()
acquisition_description = session_params.read_params(self.file_experiment_description)
self._experiment_description = acquisition_description
super(VideoCopier, self).initialize_experiment(acquisition_description=acquisition_description, **kwargs)


class BehaviorCopier(SessionCopier):
tag = 'behavior'
assert_connect_on_init = False

@property
def experiment_description(self):
return session_params.read_params(self.session_path)


class EphysCopier(SessionCopier):
tag = 'spikeglx'
assert_connect_on_init = True
Expand All @@ -240,6 +268,7 @@ def initialize_experiment(self, acquisition_description=None, nprobes=None, **kw
sync_file = Path(iblrig.__file__).parent.joinpath('device_descriptions', 'sync', 'nidq.yaml')
acquisition_description = session_params.read_params(stub_file)
acquisition_description.update(session_params.read_params(sync_file))
self._experiment_description = acquisition_description
super(EphysCopier, self).initialize_experiment(acquisition_description=acquisition_description, **kwargs)

def _copy_collections(self):
Expand Down

0 comments on commit eec978a

Please sign in to comment.