Skip to content

Commit

Permalink
Merge pull request #709 from int-brain-lab/iblrigv8dev
Browse files Browse the repository at this point in the history
8.22.2
  • Loading branch information
bimac authored Aug 21, 2024
2 parents 022f068 + 6ffcbad commit db04546
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 37 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

8.23.0
------
* hardware validation: check for unexpected events on Bpod's digital input ports
* hardware validation: frame2ttl

-------------------------------

8.22.1
------
* get past sessions bugfix when newer sessions are present only on the remote server
Expand All @@ -9,6 +16,8 @@ Changelog
------
* add UI components for selecting remote devices

-------------------------------

8.21.2
------
* fix: remote devices show as task parameters (regression)
Expand Down
2 changes: 1 addition & 1 deletion iblrig/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# 5) git tag the release in accordance to the version number below (after merge!)
# >>> git tag 8.15.6
# >>> git push origin --tags
__version__ = '8.22.1'
__version__ = '8.23.0'


from iblrig.version_management import get_detailed_version_string
Expand Down
2 changes: 1 addition & 1 deletion iblrig/frame2ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def threshold_dark(self, value: int) -> None:

@property
def threshold_light(self) -> int:
return self._threshold_dark
return self._threshold_light

@threshold_light.setter
def threshold_light(self, value: int) -> None:
Expand Down
3 changes: 2 additions & 1 deletion iblrig/gui/frame2ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import date

from PyQt5 import QtCore, QtGui, QtTest, QtWidgets
from PyQt5.QtWidgets import QWidget

from iblrig.frame2ttl import Frame2TTL
from iblrig.gui.tools import Worker
Expand Down Expand Up @@ -70,7 +71,7 @@ def _on_calibrate_dark_result(self, result: tuple[int, bool]):
class Frame2TTLCalibrationTarget(QtWidgets.QDialog):
def __init__(
self,
parent,
parent: QWidget | None = None,
color: QtGui.QColor = QtGui.QColorConstants.White,
screen_index: int | None = None,
width: int | None = None,
Expand Down
212 changes: 187 additions & 25 deletions iblrig/hardware_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
import sounddevice
import usb
from dateutil.relativedelta import relativedelta
from PyQt5.QtGui import QColorConstants
from PyQt5.QtWidgets import QApplication
from serial import Serial, SerialException
from serial.serialutil import SerialTimeoutException
from serial.tools import list_ports
from serial.tools.list_ports_common import ListPortInfo

from iblrig.base_tasks import BpodMixin, SoundMixin
from iblrig.constants import BASE_PATH, HAS_PYSPIN, HAS_SPINNAKER, IS_GIT
from iblrig.frame2ttl import Frame2TTL
from iblrig.hardware import Bpod
from iblrig.path_helper import load_pydantic_yaml
from iblrig.pydantic_definitions import HardwareSettings, RigSettings
Expand Down Expand Up @@ -92,10 +96,13 @@ def run(self, *args, **kwargs) -> Generator[Result, None, bool]:

def _get_bpod(self) -> Generator[Result, None, Bpod | None]:
if self.hardware_settings.device_bpod.COM_BPOD is None:
yield Result(Status.INFO, f'Cannot complete validation of {self.name} without Bpod')
yield Result(Status.WARN, f'Cannot complete validation of {self.name} without Bpod')
return None
try:
return Bpod(self.hardware_settings.device_bpod.COM_BPOD, skip_initialization=True)
disabled_ports = [x - 1 for x in self.hardware_settings['device_bpod']['DISABLE_BEHAVIOR_INPUT_PORTS']]
return Bpod(
self.hardware_settings.device_bpod.COM_BPOD, skip_initialization=True, disable_behavior_ports=disabled_ports
)
except Exception as e:
yield Result(Status.FAIL, f'Cannot complete validation of {self.name}: connection to Bpod failed', exception=e)
return None
Expand Down Expand Up @@ -144,7 +151,7 @@ def process(self, results: Result) -> Result:

class ValidatorSerial(Validator):
port_properties: dict[str, Any] = {}
serial_queries: None | dict[tuple[object, int], bytes] = None
serial_queries: None | dict[tuple[bytes, int], bytes] = None

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -159,7 +166,11 @@ def port_info(self) -> ListPortInfo | None:

def _run(self):
if self.port is None:
yield Result(Status.SKIP, f'No serial port defined for {self.name}')
yield Result(
Status.FAIL,
f'No serial port defined for {self.name}',
solution='Check serial port setting in hardware_settings.yaml',
)
return False
elif next((p for p in list_ports.comports() if p.device == self.port), None) is None:
yield Result(
Expand All @@ -180,7 +191,7 @@ def _run(self):
except SerialException as e:
yield Result(
Status.FAIL,
f'{self.name} on {self.port} cannot be connected to',
f'Serial device on {self.port} cannot be connected to',
solution='Try power-cycling the device',
exception=e,
)
Expand All @@ -193,13 +204,22 @@ def _run(self):

# query the devices for characteristic responses
if passed and getattr(self, 'serial_queries', None) is not None:
with Serial(self.port, timeout=1) as ser:
for query, regex_pattern in self.serial_queries.items():
ser.write(query[0])
return_string = ser.read(query[1])
ser.flush()
if not (passed := bool(re.search(regex_pattern, return_string))):
break
with Serial(self.port, timeout=1, write_timeout=1) as ser:
try:
for query, regex_pattern in self.serial_queries.items():
ser.write(query[0])
return_string = ser.read(query[1])
ser.flush()
if not (passed := bool(re.search(regex_pattern, return_string))):
break
except SerialTimeoutException as e:
yield Result(
Status.FAIL,
f'Writing to serial device on {self.port} timed out',
solution='Try power-cycling the device',
exception=e,
)
return False

if passed:
yield Result(Status.PASS, f'Serial device positively identified as {self.name}')
Expand Down Expand Up @@ -339,7 +359,10 @@ def _run(self):

# try to connect to Bpod
try:
bpod = Bpod(self.hardware_settings.device_bpod.COM_BPOD, skip_initialization=False)
disabled_ports = [x - 1 for x in self.hardware_settings['device_bpod']['DISABLE_BEHAVIOR_INPUT_PORTS']]
bpod = Bpod(
self.hardware_settings.device_bpod.COM_BPOD, skip_initialization=False, disable_behavior_ports=disabled_ports
)
yield Result(Status.PASS, 'Successfully connected to Bpod using pybpod')
except Exception as e:
yield Result(
Expand All @@ -351,6 +374,36 @@ def _run(self):
for module in bpod.modules:
if module.connected:
yield Result(Status.INFO, f'Module on port #{module.serial_port}: "{module.name}"')

# run a simple state machine to collect events on digital inputs
try:
sma = StateMachine(bpod)
sma.add_state(sma.add_state('state', 0.2, {'Tup': 'exit'}))
bpod.send_state_machine(sma)
bpod.run_state_machine(sma)
bpod_data = bpod.session.current_trial.export()
events: dict[str, list[float]] = bpod_data.get('Events timestamps', {})
except Exception as e:
yield Result(Status.FAIL, 'Error running state-machine', exception=e)
return False

# check for (un)expected input events
for event_name, timestamps in sorted(events.items()):
if event_name.endswith('Out') or event_name.endswith('Low') or event_name == 'Tup':
continue
rate = np.mean(1 / np.diff(timestamps))
port = re.sub('(In)|(High)', '', event_name)
port = re.sub('Port', 'Behavior Port ', port)
port = re.sub('BNC', 'BNC Input ', port)
if event_name in ['Port1In']:
yield Result(Status.INFO, f"Expected input events on Bpod's '{port}' at ~{rate:0.0f} Hz")
else:
yield Result(
Status.FAIL,
f"Unexpected input events on Bpod's '{port}' at ~{rate:0.0f} Hz",
solution=f"Check wiring / device connected on '{port}'.",
)

return True


Expand Down Expand Up @@ -502,6 +555,95 @@ def _run(self):
return False


class ValidatorFrame2TTL(ValidatorSerial):
_name = 'Frame2TTL'
serial_queries = {(b'C', 1): b'\xda'}

@property
def port(self):
return self.hardware_settings.device_frame2ttl.COM_F2TTL

def _run(self):
# invoke ValidateSerialDevice._run()
success = yield from super()._run()
if not success:
return False

# obtain information on versions and thresholds
with Frame2TTL(
port=self.port,
threshold_light=self.hardware_settings.device_frame2ttl.F2TTL_LIGHT_THRESH,
threshold_dark=self.hardware_settings.device_frame2ttl.F2TTL_DARK_THRESH,
) as frame2ttl:
yield Result(Status.INFO, f'Hardware Version: {frame2ttl.hw_version}')
yield Result(Status.INFO, f'Firmware Version: {frame2ttl.fw_version}')
yield Result(Status.INFO, f'Light Threshold: {frame2ttl.threshold_light} {frame2ttl.unit_str}')
yield Result(Status.INFO, f'Dark Threshold: {frame2ttl.threshold_dark} {frame2ttl.unit_str}')

# try to get Bpod
bpod = yield from self._get_bpod()
if bpod is None:
return False

# prepare test of TTL output
from iblrig.gui.frame2ttl import Frame2TTLCalibrationTarget

app = QApplication.instance()
if app_created := app is None:
app = QApplication([])
calibration_target = Frame2TTLCalibrationTarget(color=QColorConstants.Black)
calibration_target.show()

# Define state-machine
def softcode_handler(softcode: int):
nonlocal calibration_target
calibration_target.color = QColorConstants.White if softcode == 1 else QColorConstants.Black

original_softcode_handler = bpod.softcode_handler_function
bpod.softcode_handler_function = softcode_handler
sma = StateMachine(bpod)
sma.add_state(
state_name='white',
state_timer=1,
state_change_conditions={'Tup': 'black', 'BNC1High': 'black'},
output_actions=[('SoftCode', 1)],
)
sma.add_state(
state_name='black',
state_timer=1,
state_change_conditions={'Tup': 'exit', 'BNC1Low': 'exit'},
output_actions=[('SoftCode', 2)],
)

# Run state-machine
try:
bpod.send_state_machine(sma)
bpod.run_state_machine(sma)
bpod_data = bpod.session.current_trial.export()
events: dict[str, list[float]] = bpod_data.get('Events timestamps', {})
except Exception as e:
yield Result(Status.FAIL, 'Error running state-machine', exception=e)
return False
finally:
bpod.softcode_handler_function = original_softcode_handler
calibration_target.close()
if app_created:
app.quit()

# Evaluate results
if (n_events := len(events.get('BNC1High', [])) + len(events.get('BNC1Low', []))) == 2:
yield Result(Status.PASS, "Detected the correct number of events on Bpod's 'TTL Input 1'")
return True
else:
yield Result(
Status.FAIL,
('No' if n_events == 0 else 'too few' if n_events < 2 else 'too many')
+ " events detected on Bpod's 'BNC Input 1'",
solution='Check for proper installation and calibration of Frame2TTL module',
)
return False


class ValidatorGit(Validator):
_name = 'Git'

Expand Down Expand Up @@ -559,6 +701,9 @@ def _run(self):
def create_session(self):
pass

def send_spacers(self):
pass


class ValidatorSound(ValidatorSerial):
_name = 'Sound'
Expand Down Expand Up @@ -635,12 +780,14 @@ def _run(self):

# run state machine
if self.interactive:
logging.disable(logging.INFO)
task = _SoundCheckTask(subject='toto')
task.start_hardware()
sma = task.get_state_machine()
task.bpod.send_state_machine(sma)
yield Result(Status.INFO, 'Playing audible sound - can you hear it?')
task.bpod.run_state_machine(sma)
logging.disable(logging.NOTSET)
bpod_data = task.bpod.session.current_trial.export()
if (n_events := len(bpod_data['Events timestamps'].get('BNC2High', []))) == 0:
yield Result(
Expand All @@ -649,7 +796,7 @@ def _run(self):
solution="Make sure to connect the sound-card to Bpod's TTL Input 2",
)
elif n_events == 1:
yield Result(Status.PASS, "Detected Event on Bpod's TTL Input 2")
yield Result(Status.PASS, "Detected Event on Bpod's 'TTL Input 2'")
else:
yield Result(
Status.FAIL,
Expand All @@ -672,23 +819,38 @@ def run_all_validators(

def run_all_validators_cli():
validators = get_all_validators()
hardware_settings = load_pydantic_yaml(HardwareSettings)
iblrig_settings = load_pydantic_yaml(RigSettings)
fail = 0
warn = 0
for validator in validators:
v = validator()
v = validator(hardware_settings=hardware_settings, iblrig_settings=iblrig_settings, interactive=True)
print(f'{ANSI.BOLD + ANSI.UNDERLINE + v.name + ANSI.END}')
for result in v.run():
if result.status == Status.FAIL:
color = ANSI.RED + ANSI.BOLD
fail += 1
elif result.status == Status.WARN:
color = ANSI.YELLOW + ANSI.BOLD
warn += 1
else:
color = ANSI.END
print(f'{color}- {result.message}{ANSI.END}')
match result.status:
case Status.PASS:
color = ANSI.GREEN
symbol = '✓'
case Status.FAIL:
color = ANSI.RED + ANSI.BOLD
fail += 1
symbol = '✗'
case Status.WARN:
color = ANSI.YELLOW + ANSI.BOLD
warn += 1
symbol = '!'
case Status.INFO:
color = ANSI.BLUE
symbol = 'i'
case Status.SKIP:
color = ANSI.WHITE
symbol = '∅'
case _:
color = ANSI.END
symbol = '?'
print(f'{color} {symbol} {result.message}{ANSI.END}')
if result.solution is not None and len(result.solution) > 0:
print(f'{color} Suggestion: {result.solution}{ANSI.END}')
print(f'{color} Suggestion: {result.solution}{ANSI.END}')
print('')
if fail > 0:
print(ANSI.RED + ANSI.BOLD + f'{fail} validation{"s" if fail > 1 else ""} failed.')
Expand Down
2 changes: 1 addition & 1 deletion iblrig/pydantic_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class HardwareSettingsRotaryEncoder(BunchModel):


class HardwareSettingsScreen(BunchModel):
DISPLAY_IDX: Literal[0, 1]
DISPLAY_IDX: int = Field(gte=0, lte=1) # -1 = Default, 0 = First, 1 = Second, 2 = Third, etc
SCREEN_FREQ_TARGET: int = Field(gt=0)
SCREEN_FREQ_TEST_DATE: date | None = None
SCREEN_FREQ_TEST_STATUS: str | None = None
Expand Down
Loading

0 comments on commit db04546

Please sign in to comment.