Skip to content

Commit

Permalink
fix: more robust peak finding in EDD energy calibration
Browse files Browse the repository at this point in the history
  • Loading branch information
rolfverberg committed Feb 29, 2024
1 parent cccc1ed commit 0fb5624
Showing 1 changed file with 87 additions and 102 deletions.
189 changes: 87 additions & 102 deletions CHAP/edd/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,8 +927,8 @@ def calibrate(self,
txt = 'Calibrated Values:\n\n' \
+ f'Takeoff Angle:\n {tth:.5f}$^\circ$'
if True or recalibrate_energy:
txt += f'\n\nSlope:\n {slope:.5f}\n\n' \
+ f'Intercept:\n {intercept:.5f} $keV$'
txt += f'\n\nSlope:\n {slope:.5f}\n\n' \
f'Intercept:\n {intercept:.5f}'
axs[1,1].text(
0.98, 0.02, txt,
ha='right', va='bottom', ma='left',
Expand Down Expand Up @@ -969,6 +969,7 @@ class MCAEnergyCalibrationProcessor(Processor):
def process(self,
data,
peak_energies,
max_peak_index,
config=None,
fit_index_ranges=None,
peak_index_fit_delta=1.0,
Expand All @@ -993,7 +994,10 @@ def process(self,
:param peak_energies: Theoretical locations of peaks to use
for calibrating the MCA channel energies. It is _strongly_
recommended to use fluorescence peaks.
:type peak_energies: list[float]
:type peak_energies: list[float], optional
:param max_peak_index: Index of the peak in `peak_energies`
with the highest amplitude
:type max_peak_index: int
:param config: Initialization parameters for an instance of
CHAP.edd.models.MCACeriaCalibrationConfig, defaults to
`None`.
Expand All @@ -1002,13 +1006,13 @@ def process(self,
channel index ranges to include when performing a fit of
the given peaks to the provied MCA spectrum. Use this
parameter or select it interactively by running a pipeline
with `config.interactive: True`. Defaults to `None`.
:type fit_index_ranges: Optional[list[list[int]]]
with `config.interactive: True`.
:type fit_index_ranges: list[list[int]], optional
:param peak_index_fit_delta: Set boundaries on the fit peak
centers when performing the fit. The min/max possible
values for the peak centers will be the initial values
± `peak_index_fit_delta`. Defaults to `20`.
:type peak_index_fit_delta: int
:type peak_index_fit_delta: int, optional
:param max_energy_kev: Maximum channel energy of the MCA in
keV, defaults to 200.0.
:type max_energy_kev: float, optional
Expand All @@ -1030,27 +1034,46 @@ def process(self,
:rtype: dict
"""
# Local modules
from CHAP.utils.general import is_num_series
from CHAP.utils.general import (
is_int,
is_int_pair,
is_num_series,
)

# Validate arguments: peak_energies
# Validate arguments: fit_ranges & interactive
if not is_num_series(peak_energies, gt=0):
self.logger.exception(
ValueError('Invalid parameter `peak_energies`: '
f'{peak_energies}'), exc_info=False)
ValueError(
f'Invalid parameter `peak_energies`: {peak_energies}'),
exc_info=False)
if len(peak_energies) < 2:
self.logger.exception(
ValueError('Invalid parameter `peak_energies`: '
f'{peak_energies} (at least two values required)'),
exc_info=False)
peak_energies.sort()

# Validate arguments: fit_ranges & interactive
exc_info=False)
if not is_int(max_peak_index, ge=0, lt=len(peak_energies)):
self.logger.exception(
ValueError(
f'Invalid parameter `max_peak_index`: {max_peak_index}'),
exc_info=False)
if fit_index_ranges is None and not interactive:
self.logger.exception(
RuntimeError(
'If `fit_index_ranges` is not explicitly provided, '
+ self.__class__.__name__
+ ' must be run with `interactive=True`.'), exc_info=False)
+ ' must be run with `interactive=True`.'),
exc_info=False)
if (fit_index_ranges is not None
and (not isinstance(fit_index_ranges, list)
or any(not is_int_pair(v, ge=0)
for v in fit_index_ranges))):
self.logger.exception(
ValueError('Invalid parameter `fit_index_ranges`: '
f'{fit_index_ranges}'),
exc_info=False)
max_peak_energy = peak_energies[max_peak_index]
peak_energies.sort()
max_peak_index = peak_energies.index(max_peak_energy)

# Validate arguments: load the calibration configuration
try:
Expand All @@ -1072,42 +1095,45 @@ def process(self,
# Calibrate detector channel energies based on fluorescence peaks.
for detector in calibration_config.detectors:
slope, intercept = self.calibrate(
calibration_config, detector, peak_energies, fit_index_ranges,
peak_index_fit_delta, max_energy_kev, save_figures,
interactive, outputdir)
calibration_config, detector, fit_index_ranges,
peak_energies, max_peak_index, peak_index_fit_delta,
max_energy_kev, save_figures, interactive, outputdir)
detector.slope_initial_guess = slope
detector.intercept_initial_guess = intercept

return calibration_config.dict()

def calibrate(self, calibration_config, detector, peak_energies,
fit_index_ranges, peak_index_fit_delta, max_energy_kev,
save_figures, interactive, outputdir):
def calibrate(self, calibration_config, detector, fit_index_ranges,
peak_energies, max_peak_index, peak_index_fit_delta,
max_energy_kev, save_figures, interactive, outputdir):
"""Return calibrated slope & intercept for linearly converting
the current detector's MCA channels to bin energies.
:param calibration_config: Calibration configuration.
:type calibration_config: MCACeriaCalibrationConfig
:param detector: Configuration of the current detector.
:type calibration_config: MCAElementCalibrationConfig
:param peak_energies: Theoretical locations of peaks to use
for calibrating the MCA channel energies. It is _strongly_
recommended to use fluorescence peaks.
:type peak_energies: list[float]
:type detector: MCAElementCalibrationConfig
:param fit_index_ranges: Explicit ranges of uncalibrated MCA
channel index ranges to include when performing a fit of
the given peaks to the provied MCA spectrum. Use this
parameter or select it interactively by running a pipeline
with `config.interactive: True`.
:type fit_ranges: list[list[float]]
:param peak_energies: Theoretical locations of peaks to use
for calibrating the MCA channel energies. It is _strongly_
recommended to use fluorescence peaks.
:type peak_energies: list[float]
:param max_peak_index: Index of the peak in `peak_energies`
with the highest amplitude.
:type peak_energies: int
:param peak_index_fit_delta: Set boundaries on the fit peak
centers when performing the fit. The min/max possible
values for the peak centers will be the initial values
&pm; `peak_index_fit_delta`. Defaults to `20`.
&pm; `peak_index_fit_delta`.
:type peak_index_fit_delta: int
:param max_energy_kev: Maximum channel energy of the MCA in
keV, defaults to 200.0.
:type max_energy_kev: float, optional
:type max_energy_kev: float
:param save_figures: Save .pngs of plots for checking inputs &
outputs of this Processor.
:type save_figures: bool
Expand Down Expand Up @@ -1151,7 +1177,7 @@ def calibrate(self, calibration_config, detector, peak_energies,
mask, fit_index_ranges = select_mask_1d(
spectrum, x=bins, preselected_index_ranges=fit_index_ranges,
xlabel='Detector channel', ylabel='Intensity',
min_num_index_ranges=1, interactive=interactive,
min_num_index_ranges=1, interactive=False,#RV interactive,
filename=filename)
self.logger.debug(
f'Selected index ranges to fit: {fit_index_ranges}')
Expand All @@ -1167,9 +1193,8 @@ def calibrate(self, calibration_config, detector, peak_energies,
input_indices = [index_nearest(uncalibrated_energies, energy)
for energy in peak_energies]
initial_peak_indices = self._get_initial_peak_positions(
spectrum, input_indices, index_ranges=fit_index_ranges,
interactive=interactive, filename=filename,
detector_name=detector.detector_name)
spectrum*mask.astype(np.int32), fit_index_ranges, input_indices,
max_peak_index, interactive, filename, detector.detector_name)

spectrum_fit = Fit(spectrum[mask], x=bins[mask])
for i, index in enumerate(initial_peak_indices):
Expand Down Expand Up @@ -1230,8 +1255,7 @@ def calibrate(self, calibration_config, detector, peak_energies,
axs[1].text(
0.98, 0.02,
'Calibrated Values:\n\n'
+ f'Slope:\n {slope:.5f} $keV$/channel\n\n'
+ f'Intercept:\n {intercept:.5f} $keV$',
f'Slope:\n {slope:.5f}\n\n Intercept:\n {intercept:.5f}',
ha='right', va='bottom', ma='left',
transform=axs[1].transAxes,
bbox=dict(boxstyle='round',
Expand All @@ -1252,18 +1276,12 @@ def calibrate(self, calibration_config, detector, peak_energies,
return float(slope), float(intercept)

def _get_initial_peak_positions(
self, y, input_indices, index_ranges=None, interactive=True,
filename=None, detector_name=None, reset_flag=0):
self, y, index_ranges, input_indices, input_max_peak_index,
interactive, filename, detector_name, reset_flag=0):
# Third party modules
import matplotlib.pyplot as plt
from matplotlib.widgets import TextBox, Button

# Local modules
from CHAP.utils.general import (
is_int_pair,
is_int_series,
)

def change_fig_title(title):
if fig_title:
fig_title[0].remove()
Expand All @@ -1288,51 +1306,30 @@ def confirm(event):
error_texts.pop()
plt.close()

def find_peaks():
def find_peaks(min_height=0.05, min_width=5, tolerance=0.05):
# Third party modules
from scipy.signal import find_peaks as find_peaks_scipy

yy = y[index_ranges[0][0]:index_ranges[-1][1]]
prominence = 0.01*yy.max()
indices = find_peaks_scipy(yy, prominence=prominence)[0]
while prominence < 0.5*yy.max() and len(indices) > num_peak:
prominence *= 2
indices = find_peaks_scipy(yy, prominence=prominence)[0]
if len(indices) < 2:
return input_indices
peak_indices = [index + index_ranges[0][0] for index in indices]
if len(peak_indices) < num_peak:
i = 0
ii = 0
for index in peak_indices[:-1]:
shifted_input_indices = [
input_index + index - input_indices[ii]
for input_index in input_indices]
peak_delta = peak_indices[i+1] - index
input_delta1 = (
shifted_input_indices[ii+1]
- shifted_input_indices[ii])
input_delta2 = (
shifted_input_indices[ii+2]
- shifted_input_indices[ii+1])
if (abs(peak_delta-input_delta1)
< abs(peak_delta-input_delta2)):
i += 1
ii += 1
else:
peak_indices[i] = (
peak_indices[i+1] - input_delta1 - input_delta2)
peak_indices.insert(
i+1, peak_indices[i] + input_delta1)
i += 2
ii += 2
if len(peak_indices) == len(input_indices):
break
if len(peak_indices) < len(input_indices):
input_delta = \
shifted_input_indices[-1] - shifted_input_indices[-2]
peak_indices[-1] -= input_delta//2
peak_indices.append(peak_indices[-1] + input_delta)
# Find peaks
peaks = find_peaks_scipy(y, height=min_height,
prominence=0.05*y.max(), width=min_width)
available_peak_indices = list(peaks[0])
max_peak_index = np.asarray(peaks[1]).argmax()
ratio = (available_peak_indices[max_peak_index]
/ input_indices[input_max_peak_index])
peak_indices = [-1]*num_peak
peak_indices[input_max_peak_index] = \
available_peak_indices[max_peak_index]
available_peak_indices.pop(max_peak_index)
for i, input_index in enumerate(input_indices):
if i != input_max_peak_index:
index_guess = int(input_index * ratio)
for index in available_peak_indices.copy():
if abs(index_guess-index) < tolerance*index:
index_guess = index
available_peak_indices.remove(index)
break
peak_indices[i] = index_guess
return peak_indices

def select_peaks():
Expand Down Expand Up @@ -1373,30 +1370,19 @@ def select_peaks():
plt.draw()
return peak_indices

# Check inputs
peak_indices = []
fig_title = []
error_texts = []

y = np.asarray(y)
if not is_int_series(input_indices, gt=0, lt=y.size):
raise ValueError(
f'Invalid parameter `input_indices`: {input_indices}')
if index_ranges is None:
index_ranges = [[0, y.size]]
elif (not isinstance(index_ranges, list)
or any(not is_int_pair(v, ge=0, le=y.size)
for v in index_ranges)):
raise ValueError(
f'Invalid parameter `index_ranges`: {index_ranges}')
if detector_name is None:
detector_name = ''
elif not isinstance(detector_name, str):
raise ValueError(
f'Invalid parameter `detector_name`: {detector_name}')
elif not reset_flag:
detector_name = f' on detector {detector_name}'

num_peak = len(input_indices)
peak_indices = []
fig_title = []
error_texts = []

# Setup the Matplotlib figure
title_pos = (0.5, 0.95)
Expand Down Expand Up @@ -1481,9 +1467,8 @@ def select_peaks():
if interactive and len(peak_indices) != num_peak:
reset_flag += 1
return self._get_initial_peak_positions(
y, input_indices, index_ranges=index_ranges,
interactive=interactive, filename=filename,
detector_name=detector_name, reset_flag=reset_flag)
y, index_ranges, input_indices, max_peak_index,interactive,
filename, detector_name, reset_flag=reset_flag)
return peak_indices


Expand Down

0 comments on commit 0fb5624

Please sign in to comment.