Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Udp #747

Open
wants to merge 11 commits into
base: iblrigv8dev
Choose a base branch
from
Open

Udp #747

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ __pycache__/
\.sonarlint/
\.pytest*
.pytest*
*.code-workspace
*.pyc
docs/_build
scratch/
Expand Down
9 changes: 0 additions & 9 deletions iblrig/base_choice_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,6 @@ def extra_parser():
required=False,
help='initial delay before starting the first trial (default: 0 min)',
)
parser.add_argument(
'--remote',
dest='remote_rigs',
type=str,
required=False,
action='append',
nargs='+',
help='specify one of the remote rigs to interact with over the network',
)
return parser

def start_hardware(self):
Expand Down
66 changes: 51 additions & 15 deletions iblrig/base_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,12 @@ def __init__(
self.init_datetime = datetime.datetime.now()

# loads in the settings: first load the files, then update with the input argument if provided
self.hardware_settings: HardwareSettings = load_pydantic_yaml(HardwareSettings, file_hardware_settings)
if hardware_settings is not None:
self.hardware_settings.update(hardware_settings)
HardwareSettings.model_validate(self.hardware_settings)
self.iblrig_settings: RigSettings = load_pydantic_yaml(RigSettings, file_iblrig_settings)
if iblrig_settings is not None:
self.iblrig_settings.update(iblrig_settings)
RigSettings.model_validate(self.iblrig_settings)

self._load_settings(
file_hardware_settings=file_hardware_settings,
hardware_settings=hardware_settings,
file_iblrig_settings=file_iblrig_settings,
iblrig_settings=iblrig_settings,
)
self.wizard = wizard

# Load the tasks settings, from the task folder or override with the input argument
Expand Down Expand Up @@ -174,6 +171,18 @@ def __init__(
extractors=self.extractor_tasks,
)

def _load_settings(
self, file_hardware_settings=None, hardware_settings=None, file_iblrig_settings=None, iblrig_settings=None, **_
):
self.hardware_settings: HardwareSettings = load_pydantic_yaml(HardwareSettings, file_hardware_settings)
if hardware_settings is not None:
self.hardware_settings.update(hardware_settings)
HardwareSettings.model_validate(self.hardware_settings)
self.iblrig_settings: RigSettings = load_pydantic_yaml(RigSettings, file_iblrig_settings)
if iblrig_settings is not None:
self.iblrig_settings.update(iblrig_settings)
RigSettings.model_validate(self.iblrig_settings)

@classmethod
def get_task_file(cls) -> Path:
"""
Expand Down Expand Up @@ -1208,7 +1217,8 @@ def __init__(self, *_, remote_rigs=None, **kwargs):
if isinstance(remote_rigs, list):
# For now we flatten to list of remote rig names but could permit list of (name, URI) tuples
remote_rigs = list(filter(None, flatten(remote_rigs)))
all_remote_rigs = net.get_remote_devices(iblrig_settings=kwargs.get('iblrig_settings'))
self._load_settings(**kwargs)
all_remote_rigs = net.get_remote_devices(iblrig_settings=self.iblrig_settings)
if not set(remote_rigs).issubset(all_remote_rigs.keys()):
raise ValueError('Selected remote rigs not in remote rigs list')
remote_rigs = {k: v for k, v in all_remote_rigs.items() if k in remote_rigs}
Expand Down Expand Up @@ -1296,18 +1306,40 @@ def _init_paths(self, append: bool = False):
assert self.exp_ref
paths.SESSION_FOLDER = date_folder / f'{self.exp_ref["sequence"]:03}'
paths.TASK_COLLECTION = iblrig.path_helper.iterate_collection(paths.SESSION_FOLDER)
if append == paths.TASK_COLLECTION.endswith('00'):
raise ValueError(
f'Append value incorrect. Either remove previous task collections from '
f'{paths.SESSION_FOLDER}, or select append in GUI (--append arg in cli)'
)
# if append == paths.TASK_COLLECTION.endswith('00'):
# raise ValueError(
# f'Append value incorrect. Either remove previous task collections from '
# f'{paths.SESSION_FOLDER}, or select append in GUI (--append arg in cli)'
# )
log.critical('This is task number %i for %s', int(paths.TASK_COLLECTION.split('_')[-1]) + 1, self.exp_ref)

paths.SESSION_RAW_DATA_FOLDER = paths.SESSION_FOLDER.joinpath(paths.TASK_COLLECTION)
paths.DATA_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskData.raw.jsonable')
paths.SETTINGS_FILE_PATH = paths.SESSION_RAW_DATA_FOLDER.joinpath('_iblrig_taskSettings.raw.json')
self.session_info.SESSION_NUMBER = int(paths.SESSION_FOLDER.name)
return paths

@staticmethod
def extra_parser():
"""
Parse network arguments.

Namely adds the remote argument to the parser.

:return: argparse.parser()
"""
parser = super(NetworkSession, NetworkSession).extra_parser()
parser.add_argument(
'--remote',
dest='remote_rigs',
type=str,
required=False,
action='append',
nargs='+',
help='specify one of the remote rigs to interact with over the network',
)
return parser

def run(self):
"""Run session and report exceptions to remote services."""
self.start_mixin_network()
Expand Down Expand Up @@ -1399,6 +1431,7 @@ def init_mixin_network(self):
f'Running past or future sessions not currently supported. \n'
f'Please check the system date time settings on each rig.'
)
# TODO How to handle folder already existing before running UDP experiment?

# exp_ref = ConversionMixin.path2ref(self.paths['SESSION_FOLDER'], as_dict=False)
exp_ref = self.one.dict2ref(self.exp_ref)
Expand All @@ -1425,9 +1458,12 @@ def stop_mixin_network(self):

def cleanup_mixin_network(self):
"""Clean up services."""
log.info('Cleaning up network mixin')
self.remote_rigs.close()
if self.remote_rigs.is_connected:
log.warning('Failed to properly clean up network mixin')
else:
log.info('Cleaned up network mixin')


class SpontaneousSession(BaseSession):
Expand Down
10 changes: 8 additions & 2 deletions iblrig/net.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ def __init__(self, clients):
if any(self._clients):
self._thread = threading.Thread(target=asyncio.run, args=(self.listen(),), name='network_coms')
self._thread.start()
log.debug('waiting for connection with timeout 1 sec')
with self.connected:
success = self.connected.wait_for(lambda: self.is_connected is True, timeout=1)
if not success:
raise RuntimeError('Failed to connect to remote rigs')

@property
def is_connected(self) -> bool:
Expand Down Expand Up @@ -196,7 +201,8 @@ async def listen(self):
are sent to the remote services and the collated responses are added to the log.
Exits only after stop event is set. This should be called from a daemon thread.
"""
await self.create()
with self.connected:
await self.create()
while not self.stop_event.is_set():
if any(queue := sorted(self._queued)):
request_time = queue[0]
Expand All @@ -222,7 +228,7 @@ async def listen(self):
# yield x
#
# res = await anext(first(asyncio.as_completed(tasks), lambda r: r[-1]['main_sync']))
responses = await self.services.info(event, *args)
responses = await self.services.info(*args)
case net.base.ExpMessage.ALYX:
responses = await self.services.alyx(*args)
case _:
Expand Down
6 changes: 4 additions & 2 deletions iblrig/path_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ def get_local_and_remote_paths(
'remote_subjects_folder': PosixPath('Y:/Subjects')}
"""
# we only want to attempt to load the settings file if necessary
if (local_path is None) or (remote_path is None) or (lab is None):
iblrig_settings = load_pydantic_yaml(RigSettings) if iblrig_settings is None else iblrig_settings
if iblrig_settings is None and ((local_path is None) or (remote_path is None) or (lab is None)):
iblrig_settings = load_pydantic_yaml(RigSettings)
if isinstance(iblrig_settings, RigSettings):
iblrig_settings = iblrig_settings.model_dump()

paths = Bunch({'local_data_folder': local_path, 'remote_data_folder': remote_path})
if paths.local_data_folder is None:
Expand Down
58 changes: 2 additions & 56 deletions iblrig/test/test_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from iblrig import video # noqa
from iblrig.test.base import BaseTestCases # noqa
from iblrig.path_helper import load_pydantic_yaml, HARDWARE_SETTINGS_YAML, RIG_SETTINGS_YAML # noqa
from iblrig.path_helper import load_pydantic_yaml # noqa
from iblrig.pydantic_definitions import HardwareSettings # noqa


Expand All @@ -40,60 +40,6 @@ def test_download_from_alyx_or_flir(self, mock_os_rename, mock_hashfile, mock_aw
mock_os_rename.assert_called_once_with(Path('mocked_tmp_file'), expected_out_file)


class TestPrepareVideoSession(unittest.TestCase):
"""Test for iblrig.video.prepare_video_session."""

def setUp(self):
self.tmp = tempfile.TemporaryDirectory()
self.subject = 'foobar'
self.addCleanup(self.tmp.cleanup)
(input_mock := patch('builtins.input')).start()
self.addCleanup(input_mock.stop)

@patch('iblrig.video.EmptySession')
@patch('iblrig.video.HAS_PYSPIN', True)
@patch('iblrig.video.HAS_SPINNAKER', True)
@patch('iblrig.video.call_bonsai')
@patch('iblrig.video_pyspin.enable_camera_trigger')
def test_prepare_video_session(self, enable_camera_trigger, call_bonsai, session):
"""Test iblrig.video.prepare_video_session function."""
# Set up mock session folder
session_path = Path(self.tmp.name, self.subject, '2020-01-01', '001')
session().paths.SESSION_FOLDER = session_path
session_path.mkdir(parents=True)
# Set up remote path
remote_path = Path(self.tmp.name, 'remote', self.subject, '2020-01-01', '001')
session().paths.REMOTE_SUBJECT_FOLDER = remote_path
remote_path.mkdir(parents=True)
# Some test hardware settings
hws = load_pydantic_yaml(HardwareSettings, 'hardware_settings_template.yaml')
hws['device_cameras']['default']['right'] = hws['device_cameras']['default']['left']
session().hardware_settings = hws
workflows = hws['device_cameras']['default']['BONSAI_WORKFLOW']

video.prepare_video_session(self.subject, 'default')

# Validate calls
expected = [call(enable=False), call(enable=True), call(enable=False)]
enable_camera_trigger.assert_has_calls(expected)
raw_data_folder = session_path / 'raw_video_data'
expected_pars = {
'LeftCameraIndex': 1,
'RightCameraIndex': 1,
'FileNameLeft': str(raw_data_folder / '_iblrig_leftCamera.raw.avi'),
'FileNameLeftData': str(raw_data_folder / '_iblrig_leftCamera.frameData.bin'),
'FileNameRight': str(raw_data_folder / '_iblrig_rightCamera.raw.avi'),
'FileNameRightData': str(raw_data_folder / '_iblrig_rightCamera.frameData.bin'),
}
expected = [call(workflows.setup, ANY, debug=False), call(workflows.recording, expected_pars, wait=False, debug=False)]
call_bonsai.assert_has_calls(expected)

# Test config validation
self.assertRaises(ValueError, video.prepare_video_session, self.subject, 'training')
session().hardware_settings = hws.model_construct()
self.assertRaises(ValueError, video.prepare_video_session, self.subject, 'training')


class BaseCameraTest(BaseTestCases.CommonTestTask):
"""A base class for camera hardware test fixtures."""

Expand Down Expand Up @@ -217,7 +163,7 @@ def _end_bonsai_proc():
"""Return args with added side effect of signalling Bonsai subprocess termination."""
addr = '192.168.0.5:99998'
info_msg = ((net.base.ExpStatus.CONNECTED, {'subject_name': 'foo'}), addr, net.base.ExpMessage.EXPINFO)
init_msg = ({'exp_ref': f'{date.today()}_1_foo'}, addr, net.base.ExpMessage.EXPINIT)
init_msg = ([{'exp_ref': f'{date.today()}_1_foo'}], addr, net.base.ExpMessage.EXPINIT)
start_msg = ((f'{date.today()}_1_foo', {}), addr, net.base.ExpMessage.EXPSTART)
status_msg = (net.base.ExpStatus.RUNNING, addr, net.base.ExpMessage.EXPSTATUS)
for call_number, msg in enumerate((info_msg, init_msg, start_msg, status_msg, status_msg)):
Expand Down
Loading
Loading