diff --git a/CHAP/edd/__init__.py b/CHAP/edd/__init__.py index 917410d..692cf47 100755 --- a/CHAP/edd/__init__.py +++ b/CHAP/edd/__init__.py @@ -7,7 +7,8 @@ UpdateNXdataReader) from CHAP.edd.processor import (DiffractionVolumeLengthProcessor, LatticeParameterRefinementProcessor, - MCACeriaCalibrationProcessor, + MCAEnergyCalibrationProcessor, + MCATthCalibrationProcessor, MCADataProcessor, MCAEnergyCalibrationProcessor, MCACalibratedDataPlotter, diff --git a/CHAP/edd/models.py b/CHAP/edd/models.py index f709d80..31fd289 100755 --- a/CHAP/edd/models.py +++ b/CHAP/edd/models.py @@ -14,6 +14,7 @@ BaseModel, DirectoryPath, FilePath, + PrivateAttr, StrictBool, confloat, conint, @@ -103,25 +104,6 @@ def dict(self, *args, **kwargs): return d -class CeriaConfig(MaterialConfig): - """Model for the sample material used in calibrations. - - :ivar material_name: Calibration material name, - defaults to `'CeO2'`. - :type material_name: str, optional - :ivar lattice_parameters: Lattice spacing(s) for the calibration - material in angstroms, defaults to `5.41153`. - :type lattice_parameters: float, list[float], optional - :ivar sgnum: Space group of the calibration material, - defaults to `225`. - :type sgnum: int, optional - """ - #RV Name suggests it's always Ceria, why have material_name? - material_name: constr(strip_whitespace=True, min_length=1) = 'CeO2' - lattice_parameters: confloat(gt=0) = 5.41153 - sgnum: Optional[conint(ge=0)] = 225 - - # Detector configuration classes class MCAElementConfig(BaseModel): @@ -156,18 +138,15 @@ class MCAElementCalibrationConfig(MCAElementConfig): defaults to `90`. :type tth_max: float, optional :ivar hkl_tth_tol: Minimum resolvable difference in 2&theta between - two unique HKL peaks, defaults to `0.15`. + two unique Bragg peaks, defaults to `0.15`. :type hkl_tth_tol: float, optional - :ivar hkl_indices: List of unique HKL indices to fit peaks for in - the calibration routine, defaults to `[]`. - :type hkl_indices: list[int], optional - :ivar background: Background model for peak fitting. - :type background: str, list[str], optional :ivar energy_calibration_coeffs: Detector channel index to energy polynomial conversion coefficients ([a, b, c] with E_i = a*i^2 + b*i + c), defaults to `[0, 0, 1]`. :type energy_calibration_coeffs: list[float, float, float], optional + :ivar background: Background model for peak fitting. + :type background: str, list[str], optional :ivar tth_initial_guess: Initial guess for 2&theta, defaults to `5.0`. :type tth_initial_guess: float, optional @@ -180,13 +159,11 @@ class MCAElementCalibrationConfig(MCAElementConfig): """ tth_max: confloat(gt=0, allow_inf_nan=False) = 90.0 hkl_tth_tol: confloat(gt=0, allow_inf_nan=False) = 0.15 - hkl_indices: Optional[conlist(item_type=conint(ge=0))] = [] - background: Optional[Union[str, list]] - tth_initial_guess: confloat(gt=0, le=tth_max, allow_inf_nan=False) = 5.0 energy_calibration_coeffs: conlist( min_items=3, max_items=3, item_type=confloat(allow_inf_nan=False)) = [0, 0, 1] - intercept_initial_guess: Optional[confloat(allow_inf_nan=False)] + background: Optional[Union[str, list]] + tth_initial_guess: confloat(gt=0, le=tth_max, allow_inf_nan=False) = 5.0 tth_calibrated: Optional[confloat(gt=0, allow_inf_nan=False)] include_energy_ranges: conlist( min_items=1, @@ -195,6 +172,8 @@ class MCAElementCalibrationConfig(MCAElementConfig): min_items=2, max_items=2)) = [[50, 150]] + _hkl_indices: list = PrivateAttr() + @validator('include_energy_ranges', each_item=True) def validate_include_energy_range(cls, value, values): """Ensure that no energy ranges are outside the boundary of the @@ -222,15 +201,6 @@ def validate_include_energy_range(cls, value, values): value = newvalue return value - @validator('hkl_indices', pre=True) - def validate_hkl_indices(cls, hkl_indices): - if isinstance(hkl_indices, str): - # Local modules - from CHAP.utils.general import string_to_list - - hkl_indices = string_to_list(hkl_indices) - return sorted(hkl_indices) - @property def energies(self): """Return calibrated bin energies.""" @@ -256,6 +226,15 @@ def include_bin_ranges(self): index_nearest_up(energies, e_max)]) return include_bin_ranges + @property + def hkl_indices(self): + """Return the hkl_indices consistent with the selected energy + ranges (include_energy_ranges). + """ + if hasattr(self, '_hkl_indices'): + return self._hkl_indices + return [] + def get_include_energy_ranges(self, include_bin_ranges): """Given a list of channel index ranges, return the corresponding list of channel energy ranges. @@ -284,6 +263,10 @@ def mca_mask(self): mask, np.logical_and(bin_indices >= min_, bin_indices <= max_)) return mask + def set_hkl_indices(self, hkl_indices): + """Set the private attribute `hkl_indices`.""" + self._hkl_indices = hkl_indices + #RV need def dict? # d['include_energy_ranges'] = [ # [float(energy) for energy in d['include_energy_ranges'][i]] @@ -308,12 +291,6 @@ class MCAElementDiffractionVolumeLengthConfig(MCAElementConfig): :type dvl_measured: float, optional :ivar fit_amplitude: Placeholder for amplitude of the gaussian fit. :type fit_amplitude: float, optional - include_energy_ranges: conlist( - min_items=1, - item_type=conlist( - item_type=confloat(ge=25), - min_items=2, - max_items=2)) = [[50, 150]] :ivar fit_center: Placeholder for center of the gaussian fit. :type fit_center: float, optional :ivar fit_sigma: Placeholder for sigma of the gaussian fit. @@ -325,7 +302,6 @@ class MCAElementDiffractionVolumeLengthConfig(MCAElementConfig): fit_amplitude: Optional[float] = None fit_center: Optional[float] = None fit_sigma: Optional[float] = None - #RV FIX does this rely on include_energy_ranges def dict(self, *args, **kwargs): """Return a representation of this configuration in a @@ -572,7 +548,7 @@ class MCAScanDataConfig(BaseModel): scan_number: Optional[conint(gt=0)] par_file: Optional[FilePath] scan_column: Optional[str] - detectors: conlist(min_items=1, item_type=MCAElementConfig)#RV FIX does this rely on include_energy_ranges + detectors: conlist(min_items=1, item_type=MCAElementConfig) _parfile: Optional[ParFile] _scanparser: Optional[ScanParser] @@ -769,38 +745,54 @@ def scanned_vals(self): return self.scanparser.spec_scan_motor_vals[0] -class MCACeriaCalibrationConfig(MCAScanDataConfig): +class MCAEnergyCalibrationConfig(MCAScanDataConfig): """ - Class representing metadata required to perform a Ceria calibration - for an MCA detector. + Class representing metadata required to perform an energy + calibration for an MCA detector. :ivar scan_step_indices: Optional scan step indices to use for the calibration. If not specified, the calibration will be performed on the average of all MCA spectra for the scan. :type scan_step_indices: list[int], optional - :ivar flux_file: File name of the csv flux file containing station - beam energy in eV (column 0) versus flux (column 1). - :type flux_file: str, optional - :ivar material: Material configuration for Ceria. - :type material: CeriaConfig :ivar detectors: List of individual MCA detector element calibration configurations. :type detectors: list[MCAElementCalibrationConfig] - :ivar max_iter: Maximum number of iterations of the calibration - routine, defaults to `10`. - :type detectors: int, optional - :ivar tune_tth_tol: Cutoff error for tuning 2&theta. Stop iterating - the calibration routine after an iteration produces a change in - the tuned value of 2&theta that is smaller than this cutoff, - defaults to `1e-8`. - :ivar tune_tth_tol: float, optional + :ivar flux_file: File name of the csv flux file containing station + beam energy in eV (column 0) versus flux (column 1). + :type flux_file: str, optional + :ivar material: Material configuration for the calibration, + defaults to `Ceria`. + :type material: MaterialConfig, optional + :ivar peak_energies: Theoretical locations of peaks in keV to use + for calibrating the MCA channel energies. It is _strongly_ + recommended to use fluorescence peaks for the energy + calibration. + :type peak_energies: list[float] + :ivar max_peak_index: Index of the peak in `peak_energies` + with the highest amplitude. + :type max_peak_index: int + :ivar fit_index_ranges: Explicit ranges of uncalibrated MCA + channel index ranges to include during energy calibration + when the given peaks are fitted to the provied MCA spectrum. + Use this parameter or select it interactively by running a + pipeline with `config.interactive: True`. + :type fit_index_ranges: list[[int, int]], optional + """ scan_step_indices: Optional[conlist(min_items=1, item_type=conint(ge=0))] - material: CeriaConfig = CeriaConfig() detectors: conlist(min_items=1, item_type=MCAElementCalibrationConfig) flux_file: Optional[FilePath] - max_iter: conint(gt=0) = 10 - tune_tth_tol: confloat(ge=0) = 1e-8 + material: Optional[MaterialConfig] = MaterialConfig( + material_name='CeO2', lattice_parameters=5.41153, sgnum=225) + peak_energies: conlist(item_type=confloat(gt=0), min_items=2) + max_peak_index: conint(gt=0) + fit_index_ranges: Optional[ + conlist( + min_items=1, + item_type=conlist( + item_type=conint(ge=0), + min_items=2, + max_items=2))] @root_validator(pre=True) def validate_config(cls, values): @@ -832,7 +824,7 @@ def validate_scan_step_indices(cls, scan_step_indices, values): :type values: dict :raises ValueError: If a specified scan number is not found in the SPEC file. - :return: List of scan numbers. + :return: List of step indices. :rtype: list of int """ if isinstance(scan_step_indices, str): @@ -843,7 +835,25 @@ def validate_scan_step_indices(cls, scan_step_indices, values): scan_step_indices, raise_error=True) return scan_step_indices - @property + @validator('max_peak_index') + def validate_max_peak_index(cls, max_peak_index, values): + """Validate the specified index of the XRF peak with the + highest amplitude. + + :ivar max_peak_index: The index of the XRF peak with the + highest amplitude. + :type max_peak_index: int + :param values: Dictionary of validated class field values. + :type values: dict + :raises ValueError: Invalid max_peak_index. + :return: The validated value of `max_peak_index`. + :rtype: int + """ + peak_energies = values.get('peak_energies') + if not 0 <= max_peak_index < len(peak_energies): + raise ValueError('max_peak_index out of bounds') + return max_peak_index + def flux_file_energy_range(self): """Get the energy range in the flux corection file. @@ -889,7 +899,6 @@ def flux_correction_interpolation_function(self): :return: Energy flux correction interpolation function. :rtype: scipy.interpolate._polyint._Interpolator1D """ - if self.flux_file is None: return None flux = np.loadtxt(self.flux_file) @@ -899,6 +908,36 @@ def flux_correction_interpolation_function(self): return interpolation_function +class MCATthCalibrationConfig(MCAEnergyCalibrationConfig): + """ + Class representing metadata required to perform a tth calibration + for an MCA detector. + + :ivar max_iter: Maximum number of iterations of the calibration + routine, defaults to `10`. + :type max_iter: int, optional + :ivar tune_tth_tol: Cutoff error for tuning 2&theta. Stop iterating + the calibration routine after an iteration produces a change in + the tuned value of 2&theta that is smaller than this cutoff, + defaults to `1e-8`. + :ivar tune_tth_tol: float, optional + """ + max_iter: conint(gt=0) = 10 + tune_tth_tol: confloat(ge=0) = 1e-8 + + def flux_file_energy_range(self): + """Get the energy range in the flux corection file. + + :return: The energy range in the flux corection file. + :rtype: tuple(float, float) + """ + if self.flux_file is None: + return None + flux = np.loadtxt(self.flux_file) + energies = flux[:,0]/1.e3 + return energies.min(), energies.max() + + class StrainAnalysisConfig(BaseModel): """Class representing input parameters required to perform a strain analysis. diff --git a/CHAP/edd/processor.py b/CHAP/edd/processor.py index 20b7988..8d16f97 100755 --- a/CHAP/edd/processor.py +++ b/CHAP/edd/processor.py @@ -263,7 +263,7 @@ def process(self, contining refined values for the materials' lattice parameters.""" ceria_calibration_config = self.get_config( - data, 'edd.models.MCACeriaCalibrationConfig', inputdir=inputdir) + data, 'edd.models.MCATthCalibrationConfig', inputdir=inputdir) try: strain_analysis_config = self.get_config( data, 'edd.models.StrainAnalysisConfig', inputdir=inputdir) @@ -607,371 +607,257 @@ def get_spectra_fits( intensities, energies, peak_locations, detector) -class MCACeriaCalibrationProcessor(Processor): - """A Processor using a CeO2 scan to obtain tuned values for the - bragg diffraction angle and linear correction parameters for MCA - channel energies for an EDD experimental setup. - """ - +class MCAEnergyCalibrationProcessor(Processor): + """Processor to return parameters for linearly transforming MCA + channel indices to energies (in keV). Procedure: provide a + spectrum from the MCA element to be calibrated and the theoretical + location of at least one peak present in that spectrum (peak + locations must be given in keV). It is strongly recommended to use + the location of fluorescence peaks whenever possible, _not_ + diffraction peaks, as this Processor does not account for + 2&theta.""" def process(self, data, config=None, - quadratic_energy_calibration=False, + peak_index_fit_delta=1.0, + max_energy_kev=200.0, save_figures=False, + interactive=False, inputdir='.', - outputdir='.', - interactive=False): - """Return tuned values for 2&theta and linear correction - parameters for the MCA channel energies. + outputdir='.'): + """For each detector in the `MCAEnergyCalibrationConfig` + provided with `data`, fit the specified peaks in the MCA + spectrum specified. Using the difference between the provided + peak locations and the fit centers of those peaks, compute + the correction coefficients to convert uncalibrated MCA + channel energies to calibrated channel energies. For each + detector, set `energy_calibration_coeffs` in the calibration + config provided to these values and return the updated + configuration. - :param data: Input configuration for the raw data & tuning - procedure. - :type data: list[PipelineData] + :param data: An energy Calibration configuration. + :type data: PipelineData :param config: Initialization parameters for an instance of - CHAP.edd.models.MCACeriaCalibrationConfig, defaults to - None. + CHAP.edd.models.MCAEnergyCalibrationConfig, defaults to + `None`. :type config: dict, optional - :param quadratic_energy_calibration: Adds a quadratic term to - the detector channel index to energy conversion, defaults - to `False` (linear only). - :type quadratic_energy_calibration: bool, optional + :param peak_index_fit_delta: Set boundaries on the fit peak + centers when performing the fit. The min/max possible + values for the peak centers will be the initial values + ± `peak_index_fit_delta`. Defaults to `20`. + :type peak_index_fit_delta: int, optional + :param max_energy_kev: Maximum channel energy of the MCA in + keV, defaults to 200.0. + :type max_energy_kev: float, optional :param save_figures: Save .pngs of plots for checking inputs & - outputs of this Processor, defaults to False. + outputs of this Processor, defaults to `False`. :type save_figures: bool, optional - :param outputdir: Directory to which any output figures will - be saved, defaults to '.'. - :type outputdir: str, optional + :param interactive: Allows for user interactions, defaults to + `False`. + :type interactive: bool, optional :param inputdir: Input directory, used only if files in the input configuration are not absolute paths, - defaults to '.'. + defaults to `'.'`. :type inputdir: str, optional - :param interactive: Allows for user interactions, defaults to - False. - :type interactive: bool, optional - :raises RuntimeError: Invalid or missing input configuration. - :return: Original configuration with the tuned values for - 2&theta and the linear correction parameters added. - :rtype: dict[str,float] + :param outputdir: Directory to which any output figures will + be saved, defaults to `'.'`. + :type outputdir: str, optional + :returns: Dictionary representing the energy-calibrated + version of the ceria calibration configuration. + :rtype: dict """ + # Local modules + from CHAP.utils.general import ( + is_int, + is_int_pair, + is_num_series, + ) + + # Load the validated energy calibration configuration try: calibration_config = self.get_config( - data, 'edd.models.MCACeriaCalibrationConfig', + data, 'edd.models.MCAEnergyCalibrationConfig', inputdir=inputdir) except Exception as data_exc: self.logger.info('No valid calibration config in input pipeline ' 'data, using config parameter instead.') try: # Local modules - from CHAP.edd.models import MCACeriaCalibrationConfig + from CHAP.edd.models import MCAEnergyCalibrationConfig - calibration_config = MCACeriaCalibrationConfig( + calibration_config = MCAEnergyCalibrationConfig( **config, inputdir=inputdir) except Exception as dict_exc: raise RuntimeError from dict_exc - self.logger.debug(f'In process: save_figures = {save_figures}; ' - f'interactive = {interactive}') + # Validate the fit index range + if calibration_config.fit_index_ranges is None and not interactive: + self.logger.exception( + RuntimeError( + 'If `fit_index_ranges` is not explicitly provided, ' + + self.__class__.__name__ + + ' must be run with `interactive=True`.'), + exc_info=False) + # Calibrate detector channel energies based on fluorescence peaks. for detector in calibration_config.detectors: - tth, energy_calibration_coeffs = self.calibrate( - calibration_config, detector, - quadratic_energy_calibration=quadratic_energy_calibration, - save_figures=save_figures, interactive=interactive, - outputdir=outputdir) - detector.tth_calibrated = tth + energy_calibration_coeffs = self.calibrate( + calibration_config, detector, peak_index_fit_delta, + max_energy_kev, save_figures, interactive, outputdir) detector.energy_calibration_coeffs = energy_calibration_coeffs return calibration_config.dict() - def calibrate(self, - calibration_config, - detector, - quadratic_energy_calibration=False, - save_figures=False, - outputdir='.', - interactive=False): - """Iteratively calibrate 2&theta by fitting selected peaks of - an MCA spectrum until the computed strain is sufficiently - small. Use the fitted peak locations to determine linear - correction parameters for the MCA channel energies. + def calibrate(self, calibration_config, detector, peak_index_fit_delta, + max_energy_kev, save_figures, interactive, outputdir): + """Return energy_calibration_coeffs (a, b, and c) for + quadratically converting the current detector's MCA channels + to bin energies. - :param calibration_config: Object configuring the CeO2 - calibration procedure for an MCA detector. - :type calibration_config: - CHAP.edd.models.MCACeriaCalibrationConfig - :param detector: A single MCA detector element configuration. - :type detector: CHAP.edd.models.MCAElementCalibrationConfig - :param quadratic_energy_calibration: Adds a quadratic term to - the detector channel index to energy conversion, defaults - to `False` (linear only). - :type quadratic_energy_calibration: bool, optional + :param calibration_config: Energy calibration configuration. + :type calibration_config: MCAEnergyCalibrationConfig + :param detector: Configuration of the current detector. + :type detector: MCAElementCalibrationConfig + :param peak_index_fit_delta: Set boundaries on the fit peak + centers when performing the fit. The min/max possible + values for the peak centers will be the initial values + ± `peak_index_fit_delta`. + :type peak_index_fit_delta: int + :param max_energy_kev: Maximum channel energy of the MCA in + keV, defaults to 200.0. + :type max_energy_kev: float :param save_figures: Save .pngs of plots for checking inputs & - outputs of this Processor, defaults to False. - :type save_figures: bool, optional + outputs of this Processor. + :type save_figures: bool + :param interactive: Allows for user interactions. + :type interactive: bool :param outputdir: Directory to which any output figures will - be saved, defaults to '.'. - :type outputdir: str, optional - :param interactive: Allows for user interactions, defaults to - False. - :type interactive: bool, optional - :raises ValueError: No value provided for included bin ranges - or the fitted HKLs for the MCA detector element. - :return: Calibrated values of 2&theta and the correction - parameters for MCA channel energies: tth, and - energy_calibration_coeffs. - :rtype: float, [float, float, float] + be saved. + :type outputdir: str + :returns: Slope and intercept for linearly correcting the + detector's MCA channels to bin energies. + :rtype: tuple[float, float] """ # Local modules - if interactive or save_figures: - from CHAP.edd.utils import ( - select_tth_initial_guess, - select_mask_and_hkls, - ) - from CHAP.edd.utils import get_peak_locations from CHAP.utils.fit import Fit + from CHAP.utils.general import ( + index_nearest, + index_nearest_down, + index_nearest_up, + select_mask_1d, + ) - # Get the unique HKLs and lattice spacings for the calibration - # material - hkls, ds = calibration_config.material.unique_hkls_ds( - tth_tol=detector.hkl_tth_tol, tth_max=detector.tth_max) - - # Collect raw MCA data of interest - mca_bin_energies = detector.energies - mca_data = calibration_config.mca_data(detector) + self.logger.debug(f'Calibrating detector {detector.detector_name}') + spectrum = calibration_config.mca_data(detector) + uncalibrated_energies = np.linspace( + 0, max_energy_kev, detector.num_bins) + bins = np.arange(detector.num_bins, dtype=np.int16) - # Blank out data below 25 keV as well as the last bin - energy_mask = np.where(mca_bin_energies >= 25.0, 1, 0) + # Blank out data below bin 500 (~25keV) as well as the last bin + energy_mask = np.ones(detector.num_bins, dtype=np.int16) + energy_mask[:500] = 0 energy_mask[-1] = 0 - mca_data = mca_data*energy_mask + spectrum = spectrum*energy_mask - # Adjust initial tth guess + # Select the mask/detector channel ranges for fitting if save_figures: filename = os.path.join( - outputdir, - f'{detector.detector_name}_calibration_tth_initial_guess.png') + outputdir, + f'{detector.detector_name}_mca_energy_calibration_mask.png') else: filename = None - detector.tth_initial_guess = select_tth_initial_guess( - mca_bin_energies, mca_data, hkls, ds, - detector.tth_initial_guess, interactive, filename) - self.logger.debug(f'tth_initial_guess = {detector.tth_initial_guess}') + mask, fit_index_ranges = select_mask_1d( + spectrum, x=bins, + preselected_index_ranges=calibration_config.fit_index_ranges, + xlabel='Detector channel', ylabel='Intensity', + min_num_index_ranges=1, interactive=False,#RV interactive, + filename=filename) + self.logger.debug( + f'Selected index ranges to fit: {fit_index_ranges}') - # Select mask & HKLs for fitting + # Get the intial peak positions for fitting + max_peak_energy = calibration_config.peak_energies[ + calibration_config.max_peak_index] + peak_energies = list(np.sort(calibration_config.peak_energies)) + max_peak_index = peak_energies.index(max_peak_energy) if save_figures: filename = os.path.join( outputdir, - f'{detector.detector_name}_calibration_fit_mask_hkls.png') - include_bin_ranges, hkl_indices = select_mask_and_hkls( - mca_bin_energies, mca_data, hkls, ds, - detector.tth_initial_guess, detector.include_bin_ranges, - detector.hkl_indices, detector.detector_name, - flux_energy_range=calibration_config.flux_file_energy_range, - label='MCA data', interactive=interactive, filename=filename) - detector.include_energy_ranges = detector.get_include_energy_ranges( - include_bin_ranges) - detector.hkl_indices = hkl_indices - self.logger.debug( - f'include_energy_ranges = {detector.include_energy_ranges}') - if not detector.include_energy_ranges: - raise ValueError( - 'No value provided for include_energy_ranges. ' - 'Provide them in the MCA Ceria Calibration Configuration ' - 'or re-run the pipeline with the --interactive flag.') - if not detector.hkl_indices: - raise ValueError( - 'No value provided for hkl_indices. Provide them in ' - 'the detector\'s MCA Ceria Calibration Configuration or ' - 're-run the pipeline with the --interactive flag.') - mca_mask = detector.mca_mask() - fit_mca_energies = mca_bin_energies[mca_mask] - fit_mca_intensities = mca_data[mca_mask] + f'{detector.detector_name}' + '_mca_energy_calibration_initial_peak_positions.png') + else: + filename = None + input_indices = [index_nearest(uncalibrated_energies, energy) + for energy in peak_energies] + initial_peak_indices = self._get_initial_peak_positions( + spectrum*mask.astype(np.int32), fit_index_ranges, input_indices, + max_peak_index, interactive, filename, detector.detector_name) - # Correct raw MCA data for variable flux at different energies - flux_correct = \ - calibration_config.flux_correction_interpolation_function() - if flux_correct is not None: - mca_intensity_weights = flux_correct(fit_mca_energies) - fit_mca_intensities = fit_mca_intensities / mca_intensity_weights + spectrum_fit = Fit(spectrum[mask], x=bins[mask]) + for i, index in enumerate(initial_peak_indices): + spectrum_fit.add_model( + 'gaussian', prefix=f'peak{i+1}_', parameters=( + {'name': 'amplitude', 'min': 0.0}, + {'name': 'center', 'value': index, + 'min': index - peak_index_fit_delta, + 'max': index + peak_index_fit_delta}, + {'name': 'sigma', 'value': 1.0, 'min': 0.2, 'max': 8.0}, + )) + self.logger.debug('Fitting spectrum') + spectrum_fit.fit() + + fit_peak_indices = sorted([ + spectrum_fit.best_values[f'peak{i+1}_center'] + for i in range(len(initial_peak_indices))]) + self.logger.debug(f'Fit peak centers: {fit_peak_indices}') - # Get the HKLs and lattice spacings that will be used for - # fitting - # Restrict the range for the centers with some margin to - # prevent having centers near the edge of the fitting range - delta = 0.1 * (fit_mca_energies[-1]-fit_mca_energies[0]) - centers_range = ( - max(0.0, fit_mca_energies[0]-delta), fit_mca_energies[-1]+delta) - fit_hkls = np.asarray([hkls[i] for i in detector.hkl_indices]) - fit_ds = np.asarray([ds[i] for i in detector.hkl_indices]) - c_1 = fit_hkls[:,0]**2 + fit_hkls[:,1]**2 + fit_hkls[:,2]**2 - tth = float(detector.tth_initial_guess) - fit_E0 = get_peak_locations(fit_ds, tth) - for iter_i in range(calibration_config.max_iter): - self.logger.debug( - f'Tuning tth: iteration no. {iter_i}, starting value = {tth} ') - - # Perform the uniform fit first - - # Get expected peak energy locations for this iteration's - # starting value of tth - _fit_E0 = get_peak_locations(fit_ds, tth) - - # Run the uniform fit - fit = Fit(fit_mca_intensities, x=fit_mca_energies) - fit.create_multipeak_model( - _fit_E0, fit_type='uniform', background=detector.background, - centers_range=centers_range, fwhm_min=0.1, fwhm_max=1.0) - fit.fit() - - # Extract values of interest from the best values for the - # uniform fit parameters - uniform_best_fit = fit.best_fit - uniform_residual = fit.residual - uniform_fit_centers = [ - fit.best_values[f'peak{i+1}_center'] - for i in range(len(fit_hkls))] - uniform_a = fit.best_values['scale_factor'] - uniform_strain = np.log( - (uniform_a - / calibration_config.material.lattice_parameters)) # CeO2 is cubic, so this is fine here. - - # Next, perform the unconstrained fit - - # Use the peak parameters from the uniform fit as the - # initial guesses for peak locations in the unconstrained - # fit - fit.create_multipeak_model(fit_type='unconstrained') - fit.fit() - - # Extract values of interest from the best values for the - # unconstrained fit parameters - unconstrained_best_fit = fit.best_fit - unconstrained_residual = fit.residual - unconstrained_fit_centers = np.array( - [fit.best_values[f'peak{i+1}_center'] - for i in range(len(fit_hkls))]) - unconstrained_a = np.sqrt(c_1)*abs(get_peak_locations( - unconstrained_fit_centers, tth)) - unconstrained_strains = np.log( - (unconstrained_a - / calibration_config.material.lattice_parameters)) - unconstrained_strain = np.mean(unconstrained_strains) - unconstrained_tth = tth * (1.0 + unconstrained_strain) - - # Update tth for the next iteration of tuning - prev_tth = tth - tth = float(unconstrained_tth) - - # Stop tuning tth at this iteration if differences are - # small enough - if abs(tth - prev_tth) < calibration_config.tune_tth_tol: - break - - # Fit line to expected / computed peak locations from the last - # unconstrained fit. - a_init, b_init, c_init = detector.energy_calibration_coeffs - if quadratic_energy_calibration: - fit = Fit.fit_data( - _fit_E0, 'quadratic', x=unconstrained_fit_centers, - nan_policy='omit') - a_fit = fit.best_values['a'] - b_fit = fit.best_values['b'] - c_fit = fit.best_values['c'] - else: - fit = Fit.fit_data( - _fit_E0, 'linear', x=unconstrained_fit_centers, - nan_policy='omit') - a_fit = 0.0 - b_fit = fit.best_values['slope'] - c_fit = fit.best_values['intercept'] - a_final = float(b_init**2*a_fit) - b_final = float(2*b_init*c_init*a_fit + b_init*b_fit) - c_final = float(c_init**2*a_fit + c_init*b_fit + c_fit) + #RV for now stick with a linear energy correction + energy_fit = Fit.fit_data( + peak_energies, 'linear', x=fit_peak_indices, nan_policy='omit') + a = 0.0 + b = float(energy_fit.best_values['slope']) + c = float(energy_fit.best_values['intercept']) + # Reference plot to see fit results: if interactive or save_figures: - # Third party modules + # Third part modules import matplotlib.pyplot as plt - fig, axs = plt.subplots(2, 2, sharex='all', figsize=(11, 8.5)) + fig, axs = plt.subplots(1, 2, figsize=(11, 4.25)) fig.suptitle( - f'Detector {detector.detector_name} Ceria Calibration') - - # Upper left axes: Input data & best fits - axs[0,0].set_title('Ceria Calibration Fits') - axs[0,0].set_xlabel('Energy (keV)') - axs[0,0].set_ylabel('Intensity (a.u)') - for i, hkl_E in enumerate(fit_E0): - # KLS: annotate indicated HKLs w millier indices - axs[0,0].axvline(hkl_E, color='k', linestyle='--') - axs[0,0].text(hkl_E, 1, str(fit_hkls[i])[1:-1], - ha='right', va='top', rotation=90, - transform=axs[0,0].get_xaxis_transform()) - axs[0,0].plot(fit_mca_energies, uniform_best_fit, - label='Single strain') - axs[0,0].plot(fit_mca_energies, unconstrained_best_fit, - label='Unconstrained') - #axs[0,0].plot(fit_mca_energies, MISSING?, label='least squares') - axs[0,0].plot(fit_mca_energies, fit_mca_intensities, - label='Flux-corrected & wasked MCA data') - axs[0,0].legend() - - # Lower left axes: fit residuals - axs[1,0].set_title('Fit Residuals') - axs[1,0].set_xlabel('Energy (keV)') - axs[1,0].set_ylabel('Residual (a.u)') - axs[1,0].plot(fit_mca_energies, - uniform_residual, - label='Single strain') - axs[1,0].plot(fit_mca_energies, - unconstrained_residual, - label='Unconstrained') - axs[1,0].legend() - - # Upper right axes: E vs strain for each fit - axs[0,1].set_title('HKL Energy vs. Microstrain') - axs[0,1].set_xlabel('Energy (keV)') - axs[0,1].set_ylabel('Strain (\u03BC\u03B5)') - axs[0,1].axhline(uniform_strain * 1e6, - linestyle='--', label='Single strain') - axs[0,1].plot(fit_E0, unconstrained_strains * 1e6, - color='C1', marker='s', label='Unconstrained') - axs[0,1].axhline(unconstrained_strain * 1e6, - color='C1', linestyle='--', - label='Unconstrained: unweighted mean') - axs[0,1].legend() - - # Lower right axes: theoretical HKL E vs fit HKL E for - # each fit - axs[1,1].set_title('Theoretical vs. Fit HKL Energies') - axs[1,1].set_xlabel('Energy (keV)') - axs[1,1].set_ylabel('Energy (keV)') - axs[1,1].plot(fit_E0, uniform_fit_centers, - c='b', marker='o', ms=6, mfc='none', ls='', - label='Single strain') - axs[1,1].plot(fit_E0, unconstrained_fit_centers, - c='k', marker='+', ms=6, ls='', - label='Unconstrained') - if quadratic_energy_calibration: - axs[1,1].plot(fit_E0, (a_fit*_fit_E0 + b_fit)*_fit_E0 + c_fit, - color='C1', label='Unconstrained: quadratic fit') - else: - axs[1,1].plot(fit_E0, b_fit*_fit_E0 + c_fit, color='C1', - label='Unconstrained: linear fit') - axs[1,1].legend() - - # Add a text box showing final calibrated values - txt = 'Calibrated values:' \ - f'\nTakeoff angle:\n {tth:.5f}$^\circ$' - if True or recalibrate_energy: - if quadratic_energy_calibration: - txt += '\nQuadratic coefficient:' \ - f'\n {a_final:.5e} $keV$/channel$^2$' - txt += '\nLinear coefficient:' \ - f'\n {b_final:.5f} $keV$/channel' \ - f'\nConstant offset:\n {c_final:.5f}' - axs[1,1].text( - 0.98, 0.02, txt, + f'Detector {detector.detector_name} Energy Calibration') + # Left plot: raw MCA data & best fit of peaks + axs[0].set_title(f'MCA Spectrum Peak Fit') + axs[0].set_xlabel('Detector channel') + axs[0].set_ylabel('Intensity (a.u)') + axs[0].plot(bins[mask], spectrum[mask], 'b.', label='MCA data') + axs[0].plot( + bins[mask], spectrum_fit.best_fit, 'r', label='Best fit') + axs[0].legend() + # Right plot: linear fit of theoretical peak energies vs + # fit peak locations + axs[1].set_title( + 'Channel Energies vs. Channel Indices') + axs[1].set_xlabel('Detector channel') + axs[1].set_ylabel('Channel energy (keV)') + axs[1].plot(fit_peak_indices, peak_energies, + c='b', marker='o', ms=6, mfc='none', ls='', + label='Initial peak positions') + axs[1].plot(fit_peak_indices, energy_fit.best_fit, + c='k', marker='+', ms=6, ls='', + label='Fitted peak positions') + axs[1].plot(bins[mask], b*bins[mask] + c, 'r', + label='Best linear fit') + axs[1].legend() + # Add text box showing computed values of linear E + # correction parameters + axs[1].text( + 0.98, 0.02, + 'Calibrated values:\n\n' + f'Linear coefficient:\n {b:.5f} $keV$/channel\n\n' + f'Constant offset:\n {c:.5f} $keV$', ha='right', va='bottom', ma='left', - transform=axs[1,1].transAxes, + transform=axs[1].transAxes, bbox=dict(boxstyle='round', ec=(1., 0.5, 0.5), fc=(1., 0.8, 0.8, 0.8))) @@ -981,349 +867,26 @@ def calibrate(self, if save_figures: figfile = os.path.join( outputdir, - f'{detector.detector_name}_ceria_calibration_fits.png') + f'{detector.detector_name}_energy_calibration_fit.png') plt.savefig(figfile) self.logger.info(f'Saved figure to {figfile}') if interactive: plt.show() - # Return the calibrated 2&theta value and the final energy - # calibration coefficients - return tth, [a_final, b_final, c_final] + return [a, b, c] + def _get_initial_peak_positions( + self, y, index_ranges, input_indices, input_max_peak_index, + interactive, filename, detector_name, reset_flag=0): + # Third party modules + import matplotlib.pyplot as plt + from matplotlib.widgets import TextBox, Button -class MCAEnergyCalibrationProcessor(Processor): - """Processor to return parameters for linearly transforming MCA - channel indices to energies (in keV). Procedure: provide a - spectrum from the MCA element to be calibrated and the theoretical - location of at least one peak present in that spectrum (peak - locations must be given in keV). It is strongly recommended to use - the location of fluorescence peaks whenever possible, _not_ - diffraction peaks, as this Processor does not account for - 2&theta.""" - def process(self, - data, - peak_energies, - max_peak_index, - config=None, - fit_index_ranges=None, - peak_index_fit_delta=1.0, - max_energy_kev=200.0, - save_figures=False, - interactive=False, - inputdir='.', - outputdir='.'): - """For each detector in the `MCACeriaCalibrationConfig` - provided with `data`, fit the specified peaks in the MCA - spectrum specified. Using the difference between the provided - peak locations and the fit centers of those peaks, compute - the correction coefficients to convert uncalibrated MCA - channel energies to calibrated channel energies. Set the - values in the calibration config provided for - `energy_calibration_coeffs` to these values (for each detector) - and return the updated configuration. - - :param data: A Ceria Calibration configuration. - :type data: PipelineData - :param peak_energies: Theoretical locations of peaks to use - for calibrating the MCA channel energies. It is _strongly_ - recommended to use fluorescence peaks. - :type peak_energies: list[float], optional - :param max_peak_index: Index of the peak in `peak_energies` - with the highest amplitude - :type max_peak_index: int - :param config: Initialization parameters for an instance of - CHAP.edd.models.MCACeriaCalibrationConfig, defaults to - `None`. - :type config: dict, optional - :param fit_index_ranges: Explicit ranges of uncalibrated MCA - channel index ranges to include when performing a fit of - the given peaks to the provied MCA spectrum. Use this - parameter or select it interactively by running a pipeline - with `config.interactive: True`. - :type fit_index_ranges: list[list[int]], optional - :param peak_index_fit_delta: Set boundaries on the fit peak - centers when performing the fit. The min/max possible - values for the peak centers will be the initial values - ± `peak_index_fit_delta`. Defaults to `20`. - :type peak_index_fit_delta: int, optional - :param max_energy_kev: Maximum channel energy of the MCA in - keV, defaults to 200.0. - :type max_energy_kev: float, optional - :param save_figures: Save .pngs of plots for checking inputs & - outputs of this Processor, defaults to `False`. - :type save_figures: bool, optional - :param interactive: Allows for user interactions, defaults to - `False`. - :type interactive: bool, optional - :param inputdir: Input directory, used only if files in the - input configuration are not absolute paths, - defaults to `'.'`. - :type inputdir: str, optional - :param outputdir: Directory to which any output figures will - be saved, defaults to `'.'`. - :type outputdir: str, optional - :returns: Dictionary representing the energy-calibrated - version of the ceria calibration configuration. - :rtype: dict - """ - # Local modules - from CHAP.utils.general import ( - is_int, - is_int_pair, - is_num_series, - ) - - # Validate arguments: fit_ranges & interactive - if not is_num_series(peak_energies, gt=0): - self.logger.exception( - ValueError( - f'Invalid parameter `peak_energies`: {peak_energies}'), - exc_info=False) - if len(peak_energies) < 2: - self.logger.exception( - ValueError('Invalid parameter `peak_energies`: ' - f'{peak_energies} (at least two values required)'), - exc_info=False) - if not is_int(max_peak_index, ge=0, lt=len(peak_energies)): - self.logger.exception( - ValueError( - f'Invalid parameter `max_peak_index`: {max_peak_index}'), - exc_info=False) - if fit_index_ranges is None and not interactive: - self.logger.exception( - RuntimeError( - 'If `fit_index_ranges` is not explicitly provided, ' - + self.__class__.__name__ - + ' must be run with `interactive=True`.'), - exc_info=False) - if (fit_index_ranges is not None - and (not isinstance(fit_index_ranges, list) - or any(not is_int_pair(v, ge=0) - for v in fit_index_ranges))): - self.logger.exception( - ValueError('Invalid parameter `fit_index_ranges`: ' - f'{fit_index_ranges}'), - exc_info=False) - max_peak_energy = peak_energies[max_peak_index] - peak_energies.sort() - max_peak_index = peak_energies.index(max_peak_energy) - - # Validate arguments: load the calibration configuration - try: - calibration_config = self.get_config( - data, 'edd.models.MCACeriaCalibrationConfig', - inputdir=inputdir) - except Exception as data_exc: - self.logger.info('No valid calibration config in input pipeline ' - 'data, using config parameter instead.') - try: - # Local modules - from CHAP.edd.models import MCACeriaCalibrationConfig - - calibration_config = MCACeriaCalibrationConfig( - **config, inputdir=inputdir) - except Exception as dict_exc: - raise RuntimeError from dict_exc - - # Calibrate detector channel energies based on fluorescence peaks. - for detector in calibration_config.detectors: - energy_calibration_coeffs = self.calibrate( - calibration_config, detector, fit_index_ranges, - peak_energies, max_peak_index, peak_index_fit_delta, - max_energy_kev, save_figures, interactive, outputdir) - detector.energy_calibration_coeffs = energy_calibration_coeffs - - return calibration_config.dict() - - def calibrate(self, calibration_config, detector, fit_index_ranges, - peak_energies, max_peak_index, peak_index_fit_delta, - max_energy_kev, save_figures, interactive, outputdir): - """Return energy_calibration_coeffs (a, b, and c) for - quadratically converting the current detector's MCA channels - to bin energies. - - :param calibration_config: Calibration configuration. - :type calibration_config: MCACeriaCalibrationConfig - :param detector: Configuration of the current detector. - :type detector: MCAElementCalibrationConfig - :param fit_index_ranges: Explicit ranges of uncalibrated MCA - channel index ranges to include when performing a fit of - the given peaks to the provied MCA spectrum. Use this - parameter or select it interactively by running a pipeline - with `config.interactive: True`. - :type fit_ranges: list[list[float]] - :param peak_energies: Theoretical locations of peaks to use - for calibrating the MCA channel energies. It is _strongly_ - recommended to use fluorescence peaks. - :type peak_energies: list[float] - :param max_peak_index: Index of the peak in `peak_energies` - with the highest amplitude. - :type peak_energies: int - :param peak_index_fit_delta: Set boundaries on the fit peak - centers when performing the fit. The min/max possible - values for the peak centers will be the initial values - ± `peak_index_fit_delta`. - :type peak_index_fit_delta: int - :param max_energy_kev: Maximum channel energy of the MCA in - keV, defaults to 200.0. - :type max_energy_kev: float - :param save_figures: Save .pngs of plots for checking inputs & - outputs of this Processor. - :type save_figures: bool - :param interactive: Allows for user interactions. - :type interactive: bool - :param outputdir: Directory to which any output figures will - be saved. - :type outputdir: str - :returns: Slope and intercept for linearly correcting the - detector's MCA channels to bin energies. - :rtype: tuple[float, float] - """ - # Local modules - from CHAP.utils.fit import Fit - from CHAP.utils.general import ( - index_nearest, - index_nearest_down, - index_nearest_up, - select_mask_1d, - ) - - self.logger.debug(f'Calibrating detector {detector.detector_name}') - spectrum = calibration_config.mca_data(detector) - uncalibrated_energies = np.linspace( - 0, max_energy_kev, detector.num_bins) - bins = np.arange(detector.num_bins, dtype=np.int16) - - # Blank out data below bin 500 (~25keV) as well as the last bin - energy_mask = np.ones(detector.num_bins, dtype=np.int16) - energy_mask[:500] = 0 - energy_mask[-1] = 0 - spectrum = spectrum*energy_mask - - # Select the mask/detector channel ranges for fitting - if save_figures: - filename = os.path.join( - outputdir, - f'{detector.detector_name}_mca_energy_calibration_mask.png') - else: - filename = None - mask, fit_index_ranges = select_mask_1d( - spectrum, x=bins, preselected_index_ranges=fit_index_ranges, - xlabel='Detector channel', ylabel='Intensity', - min_num_index_ranges=1, interactive=False,#RV interactive, - filename=filename) - self.logger.debug( - f'Selected index ranges to fit: {fit_index_ranges}') - - # Get the intial peak positions for fitting - if save_figures: - filename = os.path.join( - outputdir, - f'{detector.detector_name}' - '_mca_energy_calibration_initial_peak_positions.png') - else: - filename = None - input_indices = [index_nearest(uncalibrated_energies, energy) - for energy in peak_energies] - initial_peak_indices = self._get_initial_peak_positions( - spectrum*mask.astype(np.int32), fit_index_ranges, input_indices, - max_peak_index, interactive, filename, detector.detector_name) - - spectrum_fit = Fit(spectrum[mask], x=bins[mask]) - for i, index in enumerate(initial_peak_indices): - spectrum_fit.add_model( - 'gaussian', prefix=f'peak{i+1}_', parameters=( - {'name': 'amplitude', 'min': 0.0}, - {'name': 'center', 'value': index, - 'min': index - peak_index_fit_delta, - 'max': index + peak_index_fit_delta}, - {'name': 'sigma', 'value': 1.0, 'min': 0.2, 'max': 8.0}, - )) - self.logger.debug('Fitting spectrum') - spectrum_fit.fit() - - fit_peak_indices = sorted([ - spectrum_fit.best_values[f'peak{i+1}_center'] - for i in range(len(initial_peak_indices))]) - self.logger.debug(f'Fit peak centers: {fit_peak_indices}') - - #RV for now stick with a linear energy correction - energy_fit = Fit.fit_data( - peak_energies, 'linear', x=fit_peak_indices, nan_policy='omit') - a = 0.0 - b = float(energy_fit.best_values['slope']) - c = float(energy_fit.best_values['intercept']) - - # Reference plot to see fit results: - if interactive or save_figures: - # Third part modules - import matplotlib.pyplot as plt - - fig, axs = plt.subplots(1, 2, figsize=(11, 4.25)) - fig.suptitle( - f'Detector {detector.detector_name} Energy Calibration') - # Left plot: raw MCA data & best fit of peaks - axs[0].set_title(f'MCA Spectrum Peak Fit') - axs[0].set_xlabel('Detector channel') - axs[0].set_ylabel('Intensity (a.u)') - axs[0].plot(bins[mask], spectrum[mask], 'b.', label='MCA data') - axs[0].plot( - bins[mask], spectrum_fit.best_fit, 'r', label='Best fit') - axs[0].legend() - # Right plot: linear fit of theoretical peak energies vs - # fit peak locations - axs[1].set_title( - 'Channel Energies vs. Channel Indices') - axs[1].set_xlabel('Detector channel') - axs[1].set_ylabel('Channel energy (keV)') - axs[1].plot(fit_peak_indices, peak_energies, - c='b', marker='o', ms=6, mfc='none', ls='', - label='Initial peak positions') - axs[1].plot(fit_peak_indices, energy_fit.best_fit, - c='k', marker='+', ms=6, ls='', - label='Fitted peak positions') - axs[1].plot(bins[mask], b*bins[mask] + c, 'r', - label='Best linear fit') - axs[1].legend() - # Add text box showing computed values of linear E - # correction parameters - axs[1].text( - 0.98, 0.02, - 'Calibrated values:\n\n' - f'Linear coefficient:\n {b:.5f} $keV$/channel\n\n' - f'Constant offset:\n {c:.5f} $keV$', - ha='right', va='bottom', ma='left', - transform=axs[1].transAxes, - bbox=dict(boxstyle='round', - ec=(1., 0.5, 0.5), - fc=(1., 0.8, 0.8, 0.8))) - - fig.tight_layout() - - if save_figures: - figfile = os.path.join( - outputdir, - f'{detector.detector_name}_energy_calibration_fit.png') - plt.savefig(figfile) - self.logger.info(f'Saved figure to {figfile}') - if interactive: - plt.show() - - return [a, b, c] - - def _get_initial_peak_positions( - self, y, index_ranges, input_indices, input_max_peak_index, - interactive, filename, detector_name, reset_flag=0): - # Third party modules - import matplotlib.pyplot as plt - from matplotlib.widgets import TextBox, Button - - def change_fig_title(title): - if fig_title: - fig_title[0].remove() - fig_title.pop() - fig_title.append(plt.figtext(*title_pos, title, **title_props)) + def change_fig_title(title): + if fig_title: + fig_title[0].remove() + fig_title.pop() + fig_title.append(plt.figtext(*title_pos, title, **title_props)) def change_error_text(error=''): if error_texts: @@ -1504,11 +1067,415 @@ def select_peaks(): if interactive and len(peak_indices) != num_peak: reset_flag += 1 return self._get_initial_peak_positions( - y, index_ranges, input_indices, max_peak_index,interactive, + y, index_ranges, input_indices, max_peak_index, interactive, filename, detector_name, reset_flag=reset_flag) return peak_indices +class MCATthCalibrationProcessor(Processor): + """Processor to calibrate the 2&theta angle and fine tune the + energy calibration coefficients for an EDD experimental setup. + """ + + def process(self, + data, + config=None, + tth_initial_guess=None, + include_energy_ranges=None, + quadratic_energy_calibration=False, + save_figures=False, + inputdir='.', + outputdir='.', + interactive=False): + """Return the calibrated 2&theta value and the fine tuned + energy calibration coefficients to convert MCA channel + indices to MCA channel energies. + + :param data: Input configuration for the raw data & tuning + procedure. + :type data: list[PipelineData] + :param config: Initialization parameters for an instance of + CHAP.edd.models.MCATthCalibrationConfig, defaults to + None. + :type config: dict, optional + :param tth_initial_guess: Initial guess for 2&theta to supercede + the values from the energy calibration detector cofiguration on + each of the detectors. + :type tth_initial_guess: float, optional + :param include_energy_ranges: List of MCA channel energy ranges + in keV whose data should be included after applying a mask + (bounds are inclusive). If specified, these supercede the + values from the energy calibration detector cofiguration on + each of the detectors. + :type include_energy_ranges: list[[float, float]], optional + :param quadratic_energy_calibration: Adds a quadratic term to + the detector channel index to energy conversion, defaults + to `False` (linear only). + :type quadratic_energy_calibration: bool, optional + :param save_figures: Save .pngs of plots for checking inputs & + outputs of this Processor, defaults to False. + :type save_figures: bool, optional + :param outputdir: Directory to which any output figures will + be saved, defaults to '.'. + :type outputdir: str, optional + :param inputdir: Input directory, used only if files in the + input configuration are not absolute paths, + defaults to '.'. + :type inputdir: str, optional + :param interactive: Allows for user interactions, defaults to + False. + :type interactive: bool, optional + :raises RuntimeError: Invalid or missing input configuration. + :return: Original configuration with the tuned values for + 2&theta and the linear correction parameters added. + :rtype: dict[str,float] + """ + try: + calibration_config = self.get_config( + data, 'edd.models.MCATthCalibrationConfig', + inputdir=inputdir) + except Exception as data_exc: + self.logger.info('No valid calibration config in input pipeline ' + 'data, using config parameter instead.') + try: + # Local modules + from CHAP.edd.models import MCATthCalibrationConfig + + calibration_config = MCATthCalibrationConfig( + **config, inputdir=inputdir) + except Exception as dict_exc: + raise RuntimeError from dict_exc + + self.logger.debug(f'In process: save_figures = {save_figures}; ' + f'interactive = {interactive}') + + for detector in calibration_config.detectors: + if tth_initial_guess is not None: + detector.tth_initial_guess = tth_initial_guess + if include_energy_ranges is not None: + detector.include_energy_ranges = include_energy_ranges + tth, energy_calibration_coeffs = self.calibrate( + calibration_config, detector, + quadratic_energy_calibration=quadratic_energy_calibration, + save_figures=save_figures, interactive=interactive, + outputdir=outputdir) + detector.tth_calibrated = tth + detector.energy_calibration_coeffs = energy_calibration_coeffs + + return calibration_config.dict() + + def calibrate(self, + calibration_config, + detector, + quadratic_energy_calibration=False, + save_figures=False, + outputdir='.', + interactive=False): + """Iteratively calibrate 2&theta by fitting selected peaks of + an MCA spectrum until the computed strain is sufficiently + small. Use the fitted peak locations to determine linear + correction parameters for the MCA channel energies. + + :param calibration_config: Object configuring the CeO2 + calibration procedure for an MCA detector. + :type calibration_config: + CHAP.edd.models.MCATthCalibrationConfig + :param detector: A single MCA detector element configuration. + :type detector: CHAP.edd.models.MCAElementCalibrationConfig + :param quadratic_energy_calibration: Adds a quadratic term to + the detector channel index to energy conversion, defaults + to `False` (linear only). + :type quadratic_energy_calibration: bool, optional + :param save_figures: Save .pngs of plots for checking inputs & + outputs of this Processor, defaults to False. + :type save_figures: bool, optional + :param outputdir: Directory to which any output figures will + be saved, defaults to '.'. + :type outputdir: str, optional + :param interactive: Allows for user interactions, defaults to + False. + :type interactive: bool, optional + :raises ValueError: No value provided for included bin ranges + or the fitted HKLs for the MCA detector element. + :return: Calibrated values of 2&theta and the correction + parameters for MCA channel energies: tth, and + energy_calibration_coeffs. + :rtype: float, [float, float, float] + """ + # Local modules + if interactive or save_figures: + from CHAP.edd.utils import ( + select_tth_initial_guess, + select_mask_and_hkls, + ) + from CHAP.edd.utils import get_peak_locations + from CHAP.utils.fit import Fit + + # Get the unique HKLs and lattice spacings for the calibration + # material + hkls, ds = calibration_config.material.unique_hkls_ds( + tth_tol=detector.hkl_tth_tol, tth_max=detector.tth_max) + + # Collect raw MCA data of interest + mca_bin_energies = detector.energies + mca_data = calibration_config.mca_data(detector) + + # Blank out data below 25 keV as well as the last bin + energy_mask = np.where(mca_bin_energies >= 25.0, 1, 0) + energy_mask[-1] = 0 + mca_data = mca_data*energy_mask + + # Adjust initial tth guess + if save_figures: + filename = os.path.join( + outputdir, + f'{detector.detector_name}_calibration_tth_initial_guess.png') + else: + filename = None + detector.tth_initial_guess = select_tth_initial_guess( + mca_bin_energies, mca_data, hkls, ds, + detector.tth_initial_guess)#RV FIX, interactive, filename) + self.logger.debug(f'tth_initial_guess = {detector.tth_initial_guess}') + + # Select mask & HKLs for fitting + if save_figures: + filename = os.path.join( + outputdir, + f'{detector.detector_name}_calibration_fit_mask_hkls.png') + include_bin_ranges, hkl_indices = select_mask_and_hkls( + mca_bin_energies, mca_data, hkls, ds, + detector.tth_initial_guess, detector.include_bin_ranges, + detector_name=detector.detector_name, + flux_energy_range=calibration_config.flux_file_energy_range(), + label='MCA data', interactive=interactive, filename=filename) + detector.include_energy_ranges = detector.get_include_energy_ranges( + include_bin_ranges) + detector.set_hkl_indices(hkl_indices) + self.logger.debug( + f'include_energy_ranges = {detector.include_energy_ranges}') + self.logger.debug( + f'hkl_indices = {detector.hkl_indices}') + if not detector.include_energy_ranges: + raise ValueError( + 'No value provided for include_energy_ranges. ' + 'Provide them in the MCA Tth Calibration Configuration ' + 'or re-run the pipeline with the --interactive flag.') + if not detector.hkl_indices: + raise ValueError( + 'Unable to get values for hkl_indices for the provided ' + 'value of include_energy_ranges. Change its value in ' + 'the detector\'s MCA Tth Calibration Configuration or ' + 're-run the pipeline with the --interactive flag.') + mca_mask = detector.mca_mask() + fit_mca_energies = mca_bin_energies[mca_mask] + fit_mca_intensities = mca_data[mca_mask] + + # Correct raw MCA data for variable flux at different energies + flux_correct = \ + calibration_config.flux_correction_interpolation_function() + if flux_correct is not None: + mca_intensity_weights = flux_correct(fit_mca_energies) + fit_mca_intensities = fit_mca_intensities / mca_intensity_weights + + # Get the HKLs and lattice spacings that will be used for + # fitting + # Restrict the range for the centers with some margin to + # prevent having centers near the edge of the fitting range + delta = 0.1 * (fit_mca_energies[-1]-fit_mca_energies[0]) + centers_range = ( + max(0.0, fit_mca_energies[0]-delta), fit_mca_energies[-1]+delta) + fit_hkls = np.asarray([hkls[i] for i in detector.hkl_indices]) + fit_ds = np.asarray([ds[i] for i in detector.hkl_indices]) + c_1 = fit_hkls[:,0]**2 + fit_hkls[:,1]**2 + fit_hkls[:,2]**2 + tth = float(detector.tth_initial_guess) + fit_E0 = get_peak_locations(fit_ds, tth) + for iter_i in range(calibration_config.max_iter): + self.logger.debug( + f'Tuning tth: iteration no. {iter_i}, starting value = {tth} ') + + # Perform the uniform fit first + + # Get expected peak energy locations for this iteration's + # starting value of tth + _fit_E0 = get_peak_locations(fit_ds, tth) + + # Run the uniform fit + fit = Fit(fit_mca_intensities, x=fit_mca_energies) + fit.create_multipeak_model( + _fit_E0, fit_type='uniform', background=detector.background, + centers_range=centers_range, fwhm_min=0.1, fwhm_max=1.0) + fit.fit() + + # Extract values of interest from the best values for the + # uniform fit parameters + uniform_best_fit = fit.best_fit + uniform_residual = fit.residual + uniform_fit_centers = [ + fit.best_values[f'peak{i+1}_center'] + for i in range(len(fit_hkls))] + uniform_a = fit.best_values['scale_factor'] + uniform_strain = np.log( + (uniform_a + / calibration_config.material.lattice_parameters)) # CeO2 is cubic, so this is fine here. + + # Next, perform the unconstrained fit + + # Use the peak parameters from the uniform fit as the + # initial guesses for peak locations in the unconstrained + # fit + fit.create_multipeak_model(fit_type='unconstrained') + fit.fit() + + # Extract values of interest from the best values for the + # unconstrained fit parameters + unconstrained_best_fit = fit.best_fit + unconstrained_residual = fit.residual + unconstrained_fit_centers = np.array( + [fit.best_values[f'peak{i+1}_center'] + for i in range(len(fit_hkls))]) + unconstrained_a = np.sqrt(c_1)*abs(get_peak_locations( + unconstrained_fit_centers, tth)) + unconstrained_strains = np.log( + (unconstrained_a + / calibration_config.material.lattice_parameters)) + unconstrained_strain = np.mean(unconstrained_strains) + unconstrained_tth = tth * (1.0 + unconstrained_strain) + + # Update tth for the next iteration of tuning + prev_tth = tth + tth = float(unconstrained_tth) + + # Stop tuning tth at this iteration if differences are + # small enough + if abs(tth - prev_tth) < calibration_config.tune_tth_tol: + break + + # Fit line to expected / computed peak locations from the last + # unconstrained fit. + a_init, b_init, c_init = detector.energy_calibration_coeffs + if quadratic_energy_calibration: + fit = Fit.fit_data( + _fit_E0, 'quadratic', x=unconstrained_fit_centers, + nan_policy='omit') + a_fit = fit.best_values['a'] + b_fit = fit.best_values['b'] + c_fit = fit.best_values['c'] + else: + fit = Fit.fit_data( + _fit_E0, 'linear', x=unconstrained_fit_centers, + nan_policy='omit') + a_fit = 0.0 + b_fit = fit.best_values['slope'] + c_fit = fit.best_values['intercept'] + a_final = float(b_init**2*a_fit) + b_final = float(2*b_init*c_init*a_fit + b_init*b_fit) + c_final = float(c_init**2*a_fit + c_init*b_fit + c_fit) + + if interactive or save_figures: + # Third party modules + import matplotlib.pyplot as plt + + fig, axs = plt.subplots(2, 2, sharex='all', figsize=(11, 8.5)) + fig.suptitle( + f'Detector {detector.detector_name} Tth Calibration') + + # Upper left axes: Input data & best fits + axs[0,0].set_title('Tth Calibration Fits') + axs[0,0].set_xlabel('Energy (keV)') + axs[0,0].set_ylabel('Intensity (a.u)') + for i, hkl_E in enumerate(fit_E0): + # KLS: annotate indicated HKLs w millier indices + axs[0,0].axvline(hkl_E, color='k', linestyle='--') + axs[0,0].text(hkl_E, 1, str(fit_hkls[i])[1:-1], + ha='right', va='top', rotation=90, + transform=axs[0,0].get_xaxis_transform()) + axs[0,0].plot(fit_mca_energies, uniform_best_fit, + label='Single strain') + axs[0,0].plot(fit_mca_energies, unconstrained_best_fit, + label='Unconstrained') + #axs[0,0].plot(fit_mca_energies, MISSING?, label='least squares') + axs[0,0].plot(fit_mca_energies, fit_mca_intensities, + label='Flux-corrected & wasked MCA data') + axs[0,0].legend() + + # Lower left axes: fit residuals + axs[1,0].set_title('Fit Residuals') + axs[1,0].set_xlabel('Energy (keV)') + axs[1,0].set_ylabel('Residual (a.u)') + axs[1,0].plot(fit_mca_energies, + uniform_residual, + label='Single strain') + axs[1,0].plot(fit_mca_energies, + unconstrained_residual, + label='Unconstrained') + axs[1,0].legend() + + # Upper right axes: E vs strain for each fit + axs[0,1].set_title('HKL Energy vs. Microstrain') + axs[0,1].set_xlabel('Energy (keV)') + axs[0,1].set_ylabel('Strain (\u03BC\u03B5)') + axs[0,1].axhline(uniform_strain * 1e6, + linestyle='--', label='Single strain') + axs[0,1].plot(fit_E0, unconstrained_strains * 1e6, + color='C1', marker='s', label='Unconstrained') + axs[0,1].axhline(unconstrained_strain * 1e6, + color='C1', linestyle='--', + label='Unconstrained: unweighted mean') + axs[0,1].legend() + + # Lower right axes: theoretical HKL E vs fit HKL E for + # each fit + axs[1,1].set_title('Theoretical vs. Fit HKL Energies') + axs[1,1].set_xlabel('Energy (keV)') + axs[1,1].set_ylabel('Energy (keV)') + axs[1,1].plot(fit_E0, uniform_fit_centers, + c='b', marker='o', ms=6, mfc='none', ls='', + label='Single strain') + axs[1,1].plot(fit_E0, unconstrained_fit_centers, + c='k', marker='+', ms=6, ls='', + label='Unconstrained') + if quadratic_energy_calibration: + axs[1,1].plot(fit_E0, (a_fit*_fit_E0 + b_fit)*_fit_E0 + c_fit, + color='C1', label='Unconstrained: quadratic fit') + else: + axs[1,1].plot(fit_E0, b_fit*_fit_E0 + c_fit, color='C1', + label='Unconstrained: linear fit') + axs[1,1].legend() + + # Add a text box showing final calibrated values + txt = 'Calibrated values:' \ + f'\nTakeoff angle:\n {tth:.5f}$^\circ$' + if True or recalibrate_energy: + if quadratic_energy_calibration: + txt += '\nQuadratic coefficient:' \ + f'\n {a_final:.5e} $keV$/channel$^2$' + txt += '\nLinear coefficient:' \ + f'\n {b_final:.5f} $keV$/channel' \ + f'\nConstant offset:\n {c_final:.5f}' + axs[1,1].text( + 0.98, 0.02, txt, + ha='right', va='bottom', ma='left', + transform=axs[1,1].transAxes, + bbox=dict(boxstyle='round', + ec=(1., 0.5, 0.5), + fc=(1., 0.8, 0.8, 0.8))) + + fig.tight_layout() + + if save_figures: + figfile = os.path.join( + outputdir, + f'{detector.detector_name}_ceria_calibration_fits.png') + plt.savefig(figfile) + self.logger.info(f'Saved figure to {figfile}') + if interactive: + plt.show() + + # Return the calibrated 2&theta value and the final energy + # calibration coefficients + return tth, [a_final, b_final, c_final] + + class MCADataProcessor(Processor): """A Processor to return data from an MCA, restuctured to incorporate the shape & metadata associated with a map @@ -1538,7 +1505,7 @@ def process(self, map_config = self.get_config( data, 'common.models.map.MapConfig', inputdir=inputdir) ceria_calibration_config = self.get_config( - data, 'edd.models.MCACeriaCalibrationConfig', inputdir=inputdir) + data, 'edd.models.MCATthCalibrationConfig', inputdir=inputdir) nxroot = self.get_nxroot(map_config, ceria_calibration_config) return nxroot @@ -1554,7 +1521,7 @@ def get_nxroot(self, map_config, calibration_config): :type map_config: CHAP.common.models.MapConfig. :param calibration_config: The calibration configuration. :type calibration_config: - CHAP.edd.models.MCACeriaCalibrationConfig + CHAP.edd.models.MCATthCalibrationConfig :return: A map of the calibrated and flux-corrected MCA data. :rtype: nexusformat.nexus.NXroot """ @@ -1665,7 +1632,7 @@ def process(self, data, spec_file, scan_number, scan_step_index=None, raise TypeError(msg) calibration_config = self.get_config( - data, 'edd.models.MCACeriaCalibrationConfig') + data, 'edd.models.MCATthCalibrationConfig') scanparser = ScanParser(spec_file, scan_number) fig, ax = plt.subplots(1, 1, figsize=(11, 8.5)) @@ -1739,7 +1706,7 @@ def process(self, """ # Get required configuration models from input data ceria_calibration_config = self.get_config( - data, 'edd.models.MCACeriaCalibrationConfig', inputdir=inputdir) + data, 'edd.models.MCATthCalibrationConfig', inputdir=inputdir) try: strain_analysis_config = self.get_config( data, 'edd.models.StrainAnalysisConfig', inputdir=inputdir) @@ -1780,7 +1747,7 @@ def get_nxroot(self, :type map_config: CHAP.common.models.map.MapConfig :param ceria_calibration_config: The calibration configuration. :type ceria_calibration_config: - 'CHAP.edd.models.MCACeriaCalibrationConfig' + 'CHAP.edd.models.MCATthCalibrationConfig' :param strain_analysis_config: Strain analysis processing configuration. :type strain_analysis_config: @@ -1983,12 +1950,12 @@ def linkdims(nxgroup, field_dims=[], oversampling_axis={}): if not detector.include_energy_ranges: raise ValueError( 'No value provided for include_energy_ranges. ' - 'Provide them in the MCA Ceria Calibration Configuration, ' + 'Provide them in the MCA Tth Calibration Configuration, ' 'or re-run the pipeline with the --interactive flag.') if not detector.hkl_indices: raise ValueError( 'No value provided for hkl_indices. Provide them in ' - 'the detector\'s MCA Ceria Calibration Configuration, or' + 'the detector\'s MCA Tth Calibration Configuration, or' ' re-run the pipeline with the --interactive flag.') # Setup NXdata group