From 0fb5624aec94530291dbb33f90444fac89cd4892 Mon Sep 17 00:00:00 2001 From: Rolf Verberg Date: Thu, 29 Feb 2024 13:49:44 -0500 Subject: [PATCH] fix: more robust peak finding in EDD energy calibration --- CHAP/edd/processor.py | 189 +++++++++++++++++++----------------------- 1 file changed, 87 insertions(+), 102 deletions(-) diff --git a/CHAP/edd/processor.py b/CHAP/edd/processor.py index c7d92da..55431a8 100755 --- a/CHAP/edd/processor.py +++ b/CHAP/edd/processor.py @@ -927,8 +927,8 @@ def calibrate(self, txt = 'Calibrated Values:\n\n' \ + f'Takeoff Angle:\n {tth:.5f}$^\circ$' if True or recalibrate_energy: - txt += f'\n\nSlope:\n {slope:.5f}\n\n' \ - + f'Intercept:\n {intercept:.5f} $keV$' + txt += f'\n\nSlope:\n {slope:.5f}\n\n' \ + f'Intercept:\n {intercept:.5f}' axs[1,1].text( 0.98, 0.02, txt, ha='right', va='bottom', ma='left', @@ -969,6 +969,7 @@ class MCAEnergyCalibrationProcessor(Processor): def process(self, data, peak_energies, + max_peak_index, config=None, fit_index_ranges=None, peak_index_fit_delta=1.0, @@ -993,7 +994,10 @@ def process(self, :param peak_energies: Theoretical locations of peaks to use for calibrating the MCA channel energies. It is _strongly_ recommended to use fluorescence peaks. - :type peak_energies: list[float] + :type peak_energies: list[float], optional + :param max_peak_index: Index of the peak in `peak_energies` + with the highest amplitude + :type max_peak_index: int :param config: Initialization parameters for an instance of CHAP.edd.models.MCACeriaCalibrationConfig, defaults to `None`. @@ -1002,13 +1006,13 @@ def process(self, channel index ranges to include when performing a fit of the given peaks to the provied MCA spectrum. Use this parameter or select it interactively by running a pipeline - with `config.interactive: True`. Defaults to `None`. - :type fit_index_ranges: Optional[list[list[int]]] + with `config.interactive: True`. + :type fit_index_ranges: list[list[int]], optional :param peak_index_fit_delta: Set boundaries on the fit peak centers when performing the fit. The min/max possible values for the peak centers will be the initial values ± `peak_index_fit_delta`. Defaults to `20`. - :type peak_index_fit_delta: int + :type peak_index_fit_delta: int, optional :param max_energy_kev: Maximum channel energy of the MCA in keV, defaults to 200.0. :type max_energy_kev: float, optional @@ -1030,27 +1034,46 @@ def process(self, :rtype: dict """ # Local modules - from CHAP.utils.general import is_num_series + from CHAP.utils.general import ( + is_int, + is_int_pair, + is_num_series, + ) - # Validate arguments: peak_energies + # Validate arguments: fit_ranges & interactive if not is_num_series(peak_energies, gt=0): self.logger.exception( - ValueError('Invalid parameter `peak_energies`: ' - f'{peak_energies}'), exc_info=False) + ValueError( + f'Invalid parameter `peak_energies`: {peak_energies}'), + exc_info=False) if len(peak_energies) < 2: self.logger.exception( ValueError('Invalid parameter `peak_energies`: ' f'{peak_energies} (at least two values required)'), - exc_info=False) - peak_energies.sort() - - # Validate arguments: fit_ranges & interactive + exc_info=False) + if not is_int(max_peak_index, ge=0, lt=len(peak_energies)): + self.logger.exception( + ValueError( + f'Invalid parameter `max_peak_index`: {max_peak_index}'), + exc_info=False) if fit_index_ranges is None and not interactive: self.logger.exception( RuntimeError( 'If `fit_index_ranges` is not explicitly provided, ' + self.__class__.__name__ - + ' must be run with `interactive=True`.'), exc_info=False) + + ' must be run with `interactive=True`.'), + exc_info=False) + if (fit_index_ranges is not None + and (not isinstance(fit_index_ranges, list) + or any(not is_int_pair(v, ge=0) + for v in fit_index_ranges))): + self.logger.exception( + ValueError('Invalid parameter `fit_index_ranges`: ' + f'{fit_index_ranges}'), + exc_info=False) + max_peak_energy = peak_energies[max_peak_index] + peak_energies.sort() + max_peak_index = peak_energies.index(max_peak_energy) # Validate arguments: load the calibration configuration try: @@ -1072,42 +1095,45 @@ def process(self, # Calibrate detector channel energies based on fluorescence peaks. for detector in calibration_config.detectors: slope, intercept = self.calibrate( - calibration_config, detector, peak_energies, fit_index_ranges, - peak_index_fit_delta, max_energy_kev, save_figures, - interactive, outputdir) + calibration_config, detector, fit_index_ranges, + peak_energies, max_peak_index, peak_index_fit_delta, + max_energy_kev, save_figures, interactive, outputdir) detector.slope_initial_guess = slope detector.intercept_initial_guess = intercept return calibration_config.dict() - def calibrate(self, calibration_config, detector, peak_energies, - fit_index_ranges, peak_index_fit_delta, max_energy_kev, - save_figures, interactive, outputdir): + def calibrate(self, calibration_config, detector, fit_index_ranges, + peak_energies, max_peak_index, peak_index_fit_delta, + max_energy_kev, save_figures, interactive, outputdir): """Return calibrated slope & intercept for linearly converting the current detector's MCA channels to bin energies. :param calibration_config: Calibration configuration. :type calibration_config: MCACeriaCalibrationConfig :param detector: Configuration of the current detector. - :type calibration_config: MCAElementCalibrationConfig - :param peak_energies: Theoretical locations of peaks to use - for calibrating the MCA channel energies. It is _strongly_ - recommended to use fluorescence peaks. - :type peak_energies: list[float] + :type detector: MCAElementCalibrationConfig :param fit_index_ranges: Explicit ranges of uncalibrated MCA channel index ranges to include when performing a fit of the given peaks to the provied MCA spectrum. Use this parameter or select it interactively by running a pipeline with `config.interactive: True`. :type fit_ranges: list[list[float]] + :param peak_energies: Theoretical locations of peaks to use + for calibrating the MCA channel energies. It is _strongly_ + recommended to use fluorescence peaks. + :type peak_energies: list[float] + :param max_peak_index: Index of the peak in `peak_energies` + with the highest amplitude. + :type peak_energies: int :param peak_index_fit_delta: Set boundaries on the fit peak centers when performing the fit. The min/max possible values for the peak centers will be the initial values - ± `peak_index_fit_delta`. Defaults to `20`. + ± `peak_index_fit_delta`. :type peak_index_fit_delta: int :param max_energy_kev: Maximum channel energy of the MCA in keV, defaults to 200.0. - :type max_energy_kev: float, optional + :type max_energy_kev: float :param save_figures: Save .pngs of plots for checking inputs & outputs of this Processor. :type save_figures: bool @@ -1151,7 +1177,7 @@ def calibrate(self, calibration_config, detector, peak_energies, mask, fit_index_ranges = select_mask_1d( spectrum, x=bins, preselected_index_ranges=fit_index_ranges, xlabel='Detector channel', ylabel='Intensity', - min_num_index_ranges=1, interactive=interactive, + min_num_index_ranges=1, interactive=False,#RV interactive, filename=filename) self.logger.debug( f'Selected index ranges to fit: {fit_index_ranges}') @@ -1167,9 +1193,8 @@ def calibrate(self, calibration_config, detector, peak_energies, input_indices = [index_nearest(uncalibrated_energies, energy) for energy in peak_energies] initial_peak_indices = self._get_initial_peak_positions( - spectrum, input_indices, index_ranges=fit_index_ranges, - interactive=interactive, filename=filename, - detector_name=detector.detector_name) + spectrum*mask.astype(np.int32), fit_index_ranges, input_indices, + max_peak_index, interactive, filename, detector.detector_name) spectrum_fit = Fit(spectrum[mask], x=bins[mask]) for i, index in enumerate(initial_peak_indices): @@ -1230,8 +1255,7 @@ def calibrate(self, calibration_config, detector, peak_energies, axs[1].text( 0.98, 0.02, 'Calibrated Values:\n\n' - + f'Slope:\n {slope:.5f} $keV$/channel\n\n' - + f'Intercept:\n {intercept:.5f} $keV$', + f'Slope:\n {slope:.5f}\n\n Intercept:\n {intercept:.5f}', ha='right', va='bottom', ma='left', transform=axs[1].transAxes, bbox=dict(boxstyle='round', @@ -1252,18 +1276,12 @@ def calibrate(self, calibration_config, detector, peak_energies, return float(slope), float(intercept) def _get_initial_peak_positions( - self, y, input_indices, index_ranges=None, interactive=True, - filename=None, detector_name=None, reset_flag=0): + self, y, index_ranges, input_indices, input_max_peak_index, + interactive, filename, detector_name, reset_flag=0): # Third party modules import matplotlib.pyplot as plt from matplotlib.widgets import TextBox, Button - # Local modules - from CHAP.utils.general import ( - is_int_pair, - is_int_series, - ) - def change_fig_title(title): if fig_title: fig_title[0].remove() @@ -1288,51 +1306,30 @@ def confirm(event): error_texts.pop() plt.close() - def find_peaks(): + def find_peaks(min_height=0.05, min_width=5, tolerance=0.05): # Third party modules from scipy.signal import find_peaks as find_peaks_scipy - yy = y[index_ranges[0][0]:index_ranges[-1][1]] - prominence = 0.01*yy.max() - indices = find_peaks_scipy(yy, prominence=prominence)[0] - while prominence < 0.5*yy.max() and len(indices) > num_peak: - prominence *= 2 - indices = find_peaks_scipy(yy, prominence=prominence)[0] - if len(indices) < 2: - return input_indices - peak_indices = [index + index_ranges[0][0] for index in indices] - if len(peak_indices) < num_peak: - i = 0 - ii = 0 - for index in peak_indices[:-1]: - shifted_input_indices = [ - input_index + index - input_indices[ii] - for input_index in input_indices] - peak_delta = peak_indices[i+1] - index - input_delta1 = ( - shifted_input_indices[ii+1] - - shifted_input_indices[ii]) - input_delta2 = ( - shifted_input_indices[ii+2] - - shifted_input_indices[ii+1]) - if (abs(peak_delta-input_delta1) - < abs(peak_delta-input_delta2)): - i += 1 - ii += 1 - else: - peak_indices[i] = ( - peak_indices[i+1] - input_delta1 - input_delta2) - peak_indices.insert( - i+1, peak_indices[i] + input_delta1) - i += 2 - ii += 2 - if len(peak_indices) == len(input_indices): - break - if len(peak_indices) < len(input_indices): - input_delta = \ - shifted_input_indices[-1] - shifted_input_indices[-2] - peak_indices[-1] -= input_delta//2 - peak_indices.append(peak_indices[-1] + input_delta) + # Find peaks + peaks = find_peaks_scipy(y, height=min_height, + prominence=0.05*y.max(), width=min_width) + available_peak_indices = list(peaks[0]) + max_peak_index = np.asarray(peaks[1]).argmax() + ratio = (available_peak_indices[max_peak_index] + / input_indices[input_max_peak_index]) + peak_indices = [-1]*num_peak + peak_indices[input_max_peak_index] = \ + available_peak_indices[max_peak_index] + available_peak_indices.pop(max_peak_index) + for i, input_index in enumerate(input_indices): + if i != input_max_peak_index: + index_guess = int(input_index * ratio) + for index in available_peak_indices.copy(): + if abs(index_guess-index) < tolerance*index: + index_guess = index + available_peak_indices.remove(index) + break + peak_indices[i] = index_guess return peak_indices def select_peaks(): @@ -1373,18 +1370,11 @@ def select_peaks(): plt.draw() return peak_indices - # Check inputs + peak_indices = [] + fig_title = [] + error_texts = [] + y = np.asarray(y) - if not is_int_series(input_indices, gt=0, lt=y.size): - raise ValueError( - f'Invalid parameter `input_indices`: {input_indices}') - if index_ranges is None: - index_ranges = [[0, y.size]] - elif (not isinstance(index_ranges, list) - or any(not is_int_pair(v, ge=0, le=y.size) - for v in index_ranges)): - raise ValueError( - f'Invalid parameter `index_ranges`: {index_ranges}') if detector_name is None: detector_name = '' elif not isinstance(detector_name, str): @@ -1392,11 +1382,7 @@ def select_peaks(): f'Invalid parameter `detector_name`: {detector_name}') elif not reset_flag: detector_name = f' on detector {detector_name}' - num_peak = len(input_indices) - peak_indices = [] - fig_title = [] - error_texts = [] # Setup the Matplotlib figure title_pos = (0.5, 0.95) @@ -1481,9 +1467,8 @@ def select_peaks(): if interactive and len(peak_indices) != num_peak: reset_flag += 1 return self._get_initial_peak_positions( - y, input_indices, index_ranges=index_ranges, - interactive=interactive, filename=filename, - detector_name=detector_name, reset_flag=reset_flag) + y, index_ranges, input_indices, max_peak_index,interactive, + filename, detector_name, reset_flag=reset_flag) return peak_indices