diff --git a/phys2bids/phys2bids.py b/phys2bids/phys2bids.py index b31b6f70b..3d5ab6495 100644 --- a/phys2bids/phys2bids.py +++ b/phys2bids/phys2bids.py @@ -192,8 +192,8 @@ def phys2bids(filename, info=False, indir='.', outdir='.', heur_file=None, # Check options to make them internally coherent pt. II # #!# This can probably be done while parsing? indir = os.path.abspath(indir) - if chtrig < 1: - raise Exception('Wrong trigger channel. Channel indexing starts with 1!') + if chtrig < 0: + raise Exception('Wrong trigger channel. Channel indexing starts with 0!') filename, ftype = utils.check_input_type(filename, indir) diff --git a/phys2bids/physio_obj.py b/phys2bids/physio_obj.py index 0eea26765..e6467fd06 100644 --- a/phys2bids/physio_obj.py +++ b/phys2bids/physio_obj.py @@ -555,10 +555,11 @@ def print_info(self, filename): def auto_trigger_selection(self): """ - Find a trigger index matching the channels with a regular expresion. + Find a trigger index automatically. It compares the channel name with the the regular expressions stored - in TRIGGER_NAMES. + in TRIGGER_NAMES. If that fails a time-domain recognition of the + trigger signal is performed. Parameters ---------- @@ -569,9 +570,6 @@ def auto_trigger_selection(self): Exception More than one possible trigger channel was automatically found. - Exception - No trigger channel automatically found - Notes ----- Outcome: @@ -594,10 +592,25 @@ def auto_trigger_selection(self): 'Please run phys2bids specifying the -chtrig argument.') else: self.trigger_idx = indexes[0] - LGR.info(f'{self.ch_name[self.trigger_idx]} selected as trigger channel') else: - raise Exception('No trigger channel automatically found. Please run phys2bids ' - 'specifying the -chtrig argument.') + # Time-domain automatic trigger detection + + # Create numpy array with all channels (excluding time) + channel_ts = np.array(self.timeseries[1:]) + + # Normalize each signal to [0,1] + min_ts = np.min(channel_ts, axis=1)[:, None] + max_ts = np.max(channel_ts, axis=1)[:, None] + channel_ts = (channel_ts - min_ts) / (max_ts - min_ts) + + # Compute distance to the closest signal limit (0 or 1) + distance = np.minimum(abs(channel_ts - 0), abs(channel_ts - 1)) + distance_mean = np.mean(distance, axis=1) + + # Set the trigger as the channel with the smallest distance + self.trigger_idx = np.nanargmin(distance_mean) + 1 + + LGR.info(f'{self.ch_name[self.trigger_idx]} selected as trigger channel') class BlueprintOutput(): diff --git a/phys2bids/tests/test_phys2bids.py b/phys2bids/tests/test_phys2bids.py index c37ee3f29..6e2688387 100644 --- a/phys2bids/tests/test_phys2bids.py +++ b/phys2bids/tests/test_phys2bids.py @@ -47,7 +47,7 @@ def test_print_json(tmpdir): def test_raise_exception(samefreq_full_acq_file): test_path, test_filename = os.path.split(samefreq_full_acq_file) with raises(Exception) as errorinfo: - phys2bids.phys2bids(filename=test_filename, indir=test_path, outdir=test_path, chtrig=0) + phys2bids.phys2bids(filename=test_filename, indir=test_path, outdir=test_path, chtrig=-1) assert 'Wrong trigger' in str(errorinfo.value) with raises(Exception) as errorinfo: diff --git a/phys2bids/tests/test_physio_obj.py b/phys2bids/tests/test_physio_obj.py index fb6d61f53..2c5f6dfcc 100644 --- a/phys2bids/tests/test_physio_obj.py +++ b/phys2bids/tests/test_physio_obj.py @@ -261,7 +261,7 @@ def test_BlueprintOutput(): assert blueprint_out == blueprint_out -def test_auto_trigger_selection(caplog): +def test_auto_trigger_selection_text(caplog): """Test auto_trigger_selection.""" test_time = np.array([0, 1, 2, 3, 4]) test_trigger = np.array([0, 1, 2, 3, 4]) @@ -284,14 +284,36 @@ def test_auto_trigger_selection(caplog): test_units, test_chtrig) assert phys_in.trigger_idx == 1 # test when no trigger is found - test_chn_name = ['time', 'TRIGGAH', 'half', 'CO2', 'CO 2', 'strigose'] - with raises(Exception) as errorinfo: - phys_in = po.BlueprintInput(test_timeseries, test_freq, test_chn_name, - test_units, test_chtrig) - assert 'No trigger channel automatically found' in str(errorinfo.value) - # test when no trigger is found test_chn_name = ['time', 'trigger', 'TRIGGER', 'CO2', 'CO 2', 'strigose'] with raises(Exception) as errorinfo: phys_in = po.BlueprintInput(test_timeseries, test_freq, test_chn_name, test_units, test_chtrig) assert 'More than one possible trigger channel' in str(errorinfo.value) + + +def test_auto_trigger_selection_time(): + """Test auto_trigger_selection in time domain.""" + # Simulate 10 s of a trigger, O2 and ECG + T = 10 + nSamp = 100 + fs = nSamp/T + test_time = np.linspace(0, T, nSamp) + test_freq = [fs, fs, fs, fs] + + # O2 as a sinusoidal of 0.5 Hz + test_O2 = np.sin(2*np.pi*0.5*test_time) + # ECG as a sinusoidal with 1.5 Hz + test_ecg = np.sin(2*np.pi*1.5*test_time) + # Trigger as a binary signal + test_trigger = np.zeros(nSamp) + test_trigger[1:nSamp:4] = 1 + + test_timeseries = [test_time, test_O2, test_ecg, test_trigger] + test_chn_name = ['time', 'O2', 'ecg', 'tiger'] + test_units = ['s', 'V', 'V', 'V'] + + # test when chtrig is 0 and the trigger is not recognized by text matching: + test_chtrig = 0 + phys_in = po.BlueprintInput(test_timeseries, test_freq, test_chn_name, + test_units, test_chtrig) + assert phys_in.trigger_idx == 3