diff --git a/physioqc/metrics/multimodal.py b/physioqc/metrics/multimodal.py index 31a456f..b45645b 100644 --- a/physioqc/metrics/multimodal.py +++ b/physioqc/metrics/multimodal.py @@ -3,10 +3,11 @@ import warnings import numpy as np -import peakdet as pk +from peakdet import Physio +from peakdet.operations import peakfind_physio from scipy import signal -from .utils import has_peakfind_physio, physio_or_numpy +from .utils import has_peaks, physio_or_numpy @physio_or_numpy @@ -42,8 +43,7 @@ def std(signal): N-sized array :obj:`numpy.ndarray` Standard deviation of signal. """ - std_val = np.std(signal, axis=0) - return std_val + return np.std(signal, axis=0) @physio_or_numpy @@ -61,8 +61,7 @@ def mean(signal: np.array): N-sized array :obj:`numpy.ndarray` Mean of signal. """ - mean_val = np.mean(signal, axis=0) - return mean_val + return np.mean(signal, axis=0) @physio_or_numpy @@ -80,8 +79,7 @@ def tSNR(signal): N-sized array :obj:`numpy.ndarray` Temporal signal to noise ratio of signal. """ - tSNR_val = np.mean(signal, axis=0) / np.std(signal, axis=0) - return tSNR_val + return np.mean(signal, axis=0) / np.std(signal, axis=0) @physio_or_numpy @@ -99,8 +97,7 @@ def CoV(signal): N-sized array :obj:`numpy.ndarray` Temporal signal to noise ratio of signal. """ - cov_val = np.std(signal, axis=0) / np.mean(signal, axis=0) - return cov_val + return np.std(signal, axis=0) / np.mean(signal, axis=0) @physio_or_numpy @@ -118,8 +115,7 @@ def min(signal: np.array): N-sized array :obj:`numpy.ndarray` min of signal. """ - min_val = np.min(signal, axis=0) - return min_val + return np.min(signal, axis=0) @physio_or_numpy @@ -137,8 +133,7 @@ def max(signal: np.array): N-sized array :obj:`numpy.ndarray` max of signal. """ - max_val = np.max(signal, axis=0) - return max_val + return np.max(signal, axis=0) @physio_or_numpy @@ -160,9 +155,8 @@ def iqr(signal: np.array, q_high: float = 75, q_low: float = 25): iqr of the signal """ p_high, p_low = np.percentile(signal, [q_high, q_low], axis=0) - iqr_val = p_high - p_low - return iqr_val + return p_high - p_low @physio_or_numpy @@ -181,13 +175,11 @@ def percentile(signal: np.array, perc: float = 2): np.array percentile of the signal """ - perc_val = np.percentile(signal, axis=0, q=perc) - - return perc_val + return np.percentile(signal, q=perc, axis=0) def peak_detection( - data: pk.Physio, + data: Physio, peak_threshold: float = 0.1, peak_dist: float = 60.0, ): @@ -196,7 +188,7 @@ def peak_detection( Parameters ---------- - data : pk.Physio + data : Physio A peakdet Physio object peak_threshold : float, optional Threshold for peak detection, by default 0.1 @@ -205,58 +197,60 @@ def peak_detection( Returns ------- - pk.Physio - Updated pk.Physio class with peaks etc. + Physio + Updated Physio class with peaks etc. """ - ph = pk.operations.peakfind_physio(data, thresh=peak_threshold, dist=peak_dist) - - return ph + return peakfind_physio(data, thresh=peak_threshold, dist=peak_dist) -def peak_distance(ph: pk.Physio): - """Calculate the time between peaks (first derivative of onsets). +def peak_distance( + ph: Physio, + peak_threshold: float = 0.1, + peak_dist: float = 60.0, +): + """Calculate the time (in seconds) between peaks (first derivative of onsets). Parameters ---------- - ph : pk.Physio - A pk.Physio object, that contains peak information. + ph : Physio + A Physio object, that contains peak information. Returns ------- np.array np.array of shape [npeaks, ] """ - if not has_peakfind_physio(ph): + if not has_peaks(ph): warnings.warn("Peaks not estimated, estimating") - ph = peak_detection(ph) - - diff_peak = np.diff(ph.peaks, axis=0) + ph = peakfind_physio(ph, thresh=peak_threshold, dist=peak_dist) - return diff_peak + return np.diff(ph.peaks, axis=0) / ph.fs -def peak_amplitude(ph: pk.Physio): +def peak_amplitude( + ph: Physio, + peak_threshold: float = 0.1, + peak_dist: float = 60.0, +): """Return the amplitude for each peak in the ph.Physio object (peak - trough). Parameters ---------- - ph : pk.Physio - pk.Physio object with peak and trough information. + ph : Physio + Physio object with peak and trough information. Returns ------- np.array np.array of shape [npeaks - 1, ] """ - if not has_peakfind_physio(ph): + if not has_peaks(ph): warnings.warn("Peaks not estimated, estimating") - ph = peak_detection(ph) + ph = peakfind_physio(ph, thresh=peak_threshold, dist=peak_dist) # Assuming that peaks and troughs are aligned. Last peak has no trough. peak_amp = ph.data[ph.peaks[:-1]] trough_amp = ph.data[ph.troughs] - peak_amplitude = peak_amp - trough_amp - - return peak_amplitude + return peak_amp - trough_amp def power_spectrum(data): @@ -309,9 +303,7 @@ def energy(data, lowf=None, highf=None): # Define frequency band idx_band = np.logical_and(freqs >= lowf, freqs <= highf) - energy = np.sum(energy_density[idx_band]) - - return energy + return np.sum(energy_density[idx_band]) def fALFF(data, lowf=0, highf=0.5): @@ -346,9 +338,7 @@ def fALFF(data, lowf=0, highf=0.5): total_energy = energy(data) # Compute the relative energy - rel_energy = band_energy / total_energy - - return rel_energy + return band_energy / total_energy def freq_energy(data, thr=0.5): @@ -372,6 +362,4 @@ def freq_energy(data, thr=0.5): The value of the threshold has been selected randomly for now. Please update it with a more meaningful value. """ energy_nd = energy(data) - freq = np.argmax(energy_nd > thr) - - return freq + return np.argmax(energy_nd > thr) diff --git a/physioqc/metrics/utils.py b/physioqc/metrics/utils.py index 9e215a4..c0b8d74 100644 --- a/physioqc/metrics/utils.py +++ b/physioqc/metrics/utils.py @@ -3,6 +3,8 @@ import functools import logging +from peakdet.physio import Physio + LGR = logging.getLogger(__name__) LGR.setLevel(logging.INFO) @@ -58,8 +60,8 @@ def wrapper(*args, **kwargs): return wrapper -def has_peakfind_physio(signal) -> bool: - """Check if "peakfind_physio" is in signal's history. +def has_peaks(signal: Physio) -> bool: + """Check if the signal is a Physio object and has (at least 2) peaks. Parameters ---------- @@ -77,8 +79,9 @@ def has_peakfind_physio(signal) -> bool: Raises error if object does not have a history attribute. """ if not hasattr(signal, "history"): - raise AttributeError("Signal has to be a Physio object!") + raise AttributeError("Signal has to be a peakdet Physio object!") - has_peakfind = any(["peakfind_physio" in i for i in signal.history]) + if signal._metadata["peaks"].size == 1: + LGR.warn("Signal has only one peak! Better to rerun peak detection.") - return has_peakfind + return signal._metadata["peaks"].size > 1