Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Hilbert RVT (Harrison et al. 2020) #22

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions phys2denoise/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .chest_belt import hilbert_rvt

__all__ = ["hilbert_rvt"]
69 changes: 69 additions & 0 deletions phys2denoise/metrics/chest_belt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Metrics derived from chest belt (respiration) recordings."""
import numpy as np
from scipy import signal

from .utils import butter_bandpass_filter, interp_nans, split_complex


def hilbert_rvt(resp, sampling_freq):
"""Calculate a Hilbert transform-based version of the respiratory volume per unit time metric.

Parameters
----------
resp : array, shape (n_timepoints,)
The respiratory trace signal in a 1D array.
sampling_freq : float
The sampling frequency, in Hertz.

Returns
-------
rvt : array, shape (n_timepoints,)
The respiratory volume per unit time metric.

References
----------
* Harrison, S. J., Bianchi, S., Heinzle, J., Stephan, K. E., Iglesias, S.,
& Kasper, L. (2020). A Hilbert-based method for processing respiratory timeseries.
bioRxiv. https://doi.org/10.1101/2020.09.30.321562
"""
# Remove low frequency drifts (less than 0.01 Hz) from the breathing signal,
# and remove high-frequency noise above 2.0 Hz.
# Lowpass filter the data again to more aggressively remove high-frequency noise above 0.75 Hz.
# Simplified to just bandpass filtering between 0.01-0.75 in one go.
resp_filt = butter_bandpass_filter(
resp, fs=sampling_freq, lowcut=0.01, highcut=0.75, order=1
)

# Decompose the signal into magnitude and phase components via the Hilbert transform.
analytic_signal = signal.hilbert(resp_filt)
magn, phas = split_complex(analytic_signal)

for i in range(10):
# Linearly interpolate any periods where the phase time course decreases,
# using the procedure in Figure 2, to remove any artefactual negative frequencies.
bad_idx = phas < 0
phas[bad_idx] = np.nan
phas = interp_nans(phas)
# Reconstruct the oscillatory portion of the signal, cos(𝜙(𝑡)),
# and lowpass filter at 0.75 Hz to remove any resulting artefacts.
oscill = np.cos(phas)
oscill = butter_bandpass_filter(oscill, fs=sampling_freq, highcut=0.75)
phas = np.arccos(oscill)
Comment on lines +47 to +51
Copy link
Member Author

@tsalo tsalo Dec 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that a number of transformations could be done here, and I'm not sure why one was chosen over the others:

  1. phase to oscillatory portion of signal (as is done)
  2. phase + magnitude to analytic signal, to be split again after low-pass filtering (assuming filtfilt can work with complex-valued data).
  3. phase + magnitude to reconstructed original signal, to be transformed again using the Hilbert after low-pass filtering.
  4. Just filtering the phase signal, right?

# This procedure is repeated 10 times, with the new phase timecourse
# re-estimated from the filtered oscillatory signal.

# instantaneous breathing rate is temporal derivative of phase
# not sure why pi is here but it's in the preprint's formula
ibr = np.append(0, np.diff(phas)) / (2 * np.pi)
Comment on lines +56 to +57
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per @CesarCaballeroGaudes, this seems to normalize radians (see #10 (comment)).


# respiratory volume is twice signal amplitude
rv = 2 * magn

# low-pass filter both(?) at 0.2Hz
ibr = butter_bandpass_filter(ibr, fs=sampling_freq, highcut=0.75)
rv = butter_bandpass_filter(rv, fs=sampling_freq, highcut=0.75)
Comment on lines +62 to +64
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if both signals should be low-pass filtered here, or if just one should be.


# rvt is product of ibr and rv
rvt = ibr * rv

return rvt
167 changes: 167 additions & 0 deletions phys2denoise/metrics/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""Miscellaneous utility functions for metric calculations."""
import numpy as np
from scipy import signal


def get_butter_filter(fs, lowcut=None, highcut=None, order=5):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a Butterworth filter (+ filtfilt) the way to go?

"""Calculate the appropriate parameters for a Butterworth filter.

Parameters
----------
fs : float
Sampling frequency of data in Hertz.
lowcut : float or None, optional
Frequency (in Hertz) under which to remove in the filter.
If both lowcut and highcut are not None, then a bandpass filter is applied.
If lowcut is None and highcut is not, then a low-pass filter is applied.
If highcut is None and lowcut is not, then a high-pass filter is applied.
Either lowcut, highcut, or both must not be None.
highcut : float or None, optional
Frequency (in Hertz) above which to remove in the filter.
If both lowcut and highcut are not None, then a bandpass filter is applied.
If lowcut is None and highcut is not, then a low-pass filter is applied.
If highcut is None and lowcut is not, then a high-pass filter is applied.
Either lowcut, highcut, or both must not be None.
order : int, optional
Nonzero positive integer indicating order of the filter. Default is 1.

Returns
-------
b, a : float
Parameters from the Butterworth filter.

Notes
-----
From https://scipy-cookbook.readthedocs.io/items/ButterworthBandpass.html
"""
nyq = 0.5 * fs

if (lowcut is not None) and (highcut is not None):
low = lowcut / nyq
high = highcut / nyq
window = (low, high)
btype = "bandpass"
elif lowcut is not None:
window = lowcut / nyq
btype = "highpass"
elif highcut is not None:
window = highcut / nyq
btype = "lowpass"
elif (lowcut is None) and (highcut is None):
raise ValueError("Either lowcut or highcut must be specified.")

b, a = signal.butter(order, window, btype=btype)
return b, a


def butter_bandpass_filter(data, fs, lowcut=None, highcut=None, order=1):
"""Band/low/high-pass filter an array based on cut frequencies.

Parameters
----------
data : array, shape (n_timepoints,)
Data to filter.
fs : float
Sampling frequency of data in Hertz.
lowcut : float or None, optional
Frequency (in Hertz) under which to remove in the filter.
If both lowcut and highcut are not None, then a bandpass filter is applied.
If lowcut is None and highcut is not, then a low-pass filter is applied.
If highcut is None and lowcut is not, then a high-pass filter is applied.
Either lowcut, highcut, or both must not be None.
highcut : float or None, optional
Frequency (in Hertz) above which to remove in the filter.
If both lowcut and highcut are not None, then a bandpass filter is applied.
If lowcut is None and highcut is not, then a low-pass filter is applied.
If highcut is None and lowcut is not, then a high-pass filter is applied.
Either lowcut, highcut, or both must not be None.
order : int, optional
Nonzero positive integer indicating order of the filter. Default is 1.

Returns
-------
y : array, shape (n_timepoints,)
Filtered data.

Notes
-----
From https://scipy-cookbook.readthedocs.io/items/ButterworthBandpass.html
"""
b, a = get_butter_filter(fs, lowcut, highcut, order=order)
y = signal.filtfilt(b, a, data)
return y


def split_complex(complex_signal):
"""Split a complex-valued nifti image into magnitude and phase images.

From complex-flow
"""
real = complex_signal.real
imag = complex_signal.imag
mag = abs(complex_signal)
phase = to_phase(real, imag)
return mag, phase


def to_phase(real, imag):
"""Convert real and imaginary data to phase data.

Equivalent to cmath.phase.
https://www.eeweb.com/quizzes/convert-between-real-imaginary-and-magnitude-phase

From complex-flow
"""
phase = np.arctan2(imag, real)
return phase


def nan_helper(y):
"""Handle indices and logical indices of NaNs.

Parameters
----------
y : array, shape (n_timepoints,)
A 1D array with possible NaNs.

Returns
-------
nans : array, shape (n_timepoints,)
Logical indices of NaNs in y.
index : function
A function, with signature indices = index(logical_indices),
to convert logical indices of NaNs to 'equivalent' indices.

Examples
--------
>>> # linear interpolation of NaNs
>>> nans, x= nan_helper(y)
>>> y[nans]= np.interp(x(nans), x(~nans), y[~nans])

Notes
-----
From https://stackoverflow.com/a/6520696
"""
return np.isnan(y), lambda z: z.nonzero()[0]


def interp_nans(y):
"""Linearly interpolate across NaNs in a 1D array.

Parameters
----------
y : array, shape (n_timepoints,)
A 1D array with possible NaNs.

Returns
-------
y : array, shape (n_timepoints,)
A 1D array with no NaNs.

Notes
-----
From https://stackoverflow.com/a/6520696
"""
nans, x = nan_helper(y)
y[nans] = np.interp(x(nans), x(~nans), y[~nans])
return y