Skip to content

Commit

Permalink
Merge pull request #100 from CHESSComputing/edd-fall2023
Browse files Browse the repository at this point in the history
Edd fall2023
  • Loading branch information
rolfverberg authored Feb 1, 2024
2 parents d7898ee + ed293e2 commit 490a730
Show file tree
Hide file tree
Showing 12 changed files with 1,618 additions and 315 deletions.
8 changes: 7 additions & 1 deletion CHAP/common/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def read(self, filename):
class H5Reader(Reader):
"""Reader for h5 files.
"""
def read(self, filename, h5path='/'):
def read(self, filename, h5path='/', idx=None):
"""Return the data object stored at `h5path` in an h5-file.
:param filename: The name of the h5-file to read from.
Expand All @@ -54,6 +54,8 @@ def read(self, filename, h5path='/'):
from h5py import File

data = File(filename, 'r')[h5path]
if idx is not None:
data = data[tuple(idx)]
return data


Expand Down Expand Up @@ -166,6 +168,8 @@ def read(
# Create empty NXfields of appropriate shape for raw
# detector data
for detector_name in detector_names:
if not isinstance(detector_name, str):
detector_name = str(detector_name)
detector_data = map_config.get_detector_data(
detector_name, (0,) * len(map_config.shape))
nxentry.data[detector_name] = NXfield(value=np.zeros(
Expand All @@ -179,6 +183,8 @@ def read(
nxentry.data[data.label][map_index] = map_config.get_value(
data, map_index)
for detector_name in detector_names:
if not isinstance(detector_name, str):
detector_name = str(detector_name)
nxentry.data[detector_name][map_index] = \
map_config.get_detector_data(detector_name, map_index)

Expand Down
2 changes: 2 additions & 0 deletions CHAP/edd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
"""
# from CHAP.edd.reader import
from CHAP.edd.processor import (DiffractionVolumeLengthProcessor,
LatticeParameterRefinementProcessor,
MCACeriaCalibrationProcessor,
MCADataProcessor,
MCAEnergyCalibrationProcessor,
StrainAnalysisProcessor)
# from CHAP.edd.writer import

Expand Down
142 changes: 94 additions & 48 deletions CHAP/edd/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,70 @@ class MCAElementConfig(BaseModel):
:type detector_name: str
:ivar num_bins: Number of MCA channels.
:type num_bins: int, optional
:ivar include_bin_ranges: List of MCA channel index ranges whose
data should be included after applying a mask (the bounds are
inclusive), defaults to `[]`
:type include_bin_ranges: list[[int, int]], optional
:ivar include_energy_ranges: List of MCA channel energy ranges
whose data should be included after applying a mask (the
bounds are inclusive), defaults to `[[50,150]]`
:type include_energy_ranges: list[[float, float]], optional
"""
detector_name: constr(strip_whitespace=True, min_length=1) = 'mca1'
num_bins: Optional[conint(gt=0)]
include_bin_ranges: conlist(
max_energy_kev: confloat(gt=0) = 200
include_energy_ranges: conlist(
min_items=1,
item_type=conlist(
item_type=conint(ge=0),
item_type=confloat(ge=0),
min_items=2,
max_items=2)) = []
max_items=2)) = [[50,150]]

@validator('include_bin_ranges', each_item=True)
def validate_include_bin_range(cls, value, values):
"""Ensure that no bin ranges are outside the boundary of the
@validator('include_energy_ranges', each_item=True)
def validate_include_energy_range(cls, value, values):
"""Ensure that no energy ranges are outside the boundary of the
detector.
:param value: Field value to validate (`include_bin_ranges`).
:param value: Field value to validate (`include_energy_ranges`).
:type values: dict
:param values: Dictionary of previously validated field values.
:type values: dict
:return: The validated value of `include_bin_ranges`.
:return: The validated value of `include_energy_ranges`.
:rtype: dict
"""
num_bins = values.get('num_bins')
if num_bins is not None:
value[1] = min(value[1], num_bins-1)
if value[0] >= value[1]:
raise ValueError('Invalid bin range in include_bin_ranges '
f'({value})')
max_energy_kev = values.get('max_energy_kev')
value.sort()
if value[1] > max_energy_kev:
value[1] = max_energy_kev
return value

@property
def include_bin_ranges(self):
"""Return the value of `include_energy_ranges` represented in
terms of channel indices instead of channel energies.
"""
from CHAP.utils.general import index_nearest_down, index_nearest_upp

include_bin_ranges = []
energies = np.linspace(0, self.max_energy_kev, self.num_bins)
for e_min, e_max in self.include_energy_ranges:
include_bin_ranges.append(
[index_nearest_down(energies, e_min),
index_nearest_upp(energies, e_max)])
return include_bin_ranges

def get_energy_ranges(self, bin_ranges):
"""Given a list of channel index ranges, return the
correspongin list of channel energy ranges.
:param bin_ranges: A list of channel bin ranges to convert to
energy ranges.
:type bin_ranges: list[list[int]]
:returns: Energy ranges
:rtype: list[list[float]]
"""
energies = np.linspace(0, self.max_energy_kev, self.num_bins)
return [[energies[i] for i in range_] for range_ in bin_ranges]

def mca_mask(self):
"""Get a boolean mask array to use on this MCA element's data.
Note that the bounds of self.include_bin_ranges are inclusive.
Note that the bounds of self.include_energy_ranges are inclusive.
:return: Boolean mask array.
:rtype: numpy.ndarray
Expand All @@ -94,9 +121,9 @@ def dict(self, *args, **kwargs):
:rtype: dict
"""
d = super().dict(*args, **kwargs)
d['include_bin_ranges'] = [
list(d['include_bin_ranges'][i]) \
for i in range(len(d['include_bin_ranges']))]
d['include_energy_ranges'] = [
[float(energy) for energy in d['include_energy_ranges'][i]]
for i in range(len(d['include_energy_ranges']))]
return d


Expand Down Expand Up @@ -172,7 +199,7 @@ def validate_scan(cls, values):
def validate_detectors(cls, values):
"""Fill in values for _scanparser / _parfile (if applicable).
Fill in each detector's num_bins field, if needed.
Check each detector's include_bin_ranges field against the
Check each detector's include_energy_ranges field against the
flux file, if available.
:param values: Dictionary of previously validated field values.
Expand Down Expand Up @@ -212,20 +239,21 @@ def validate_detectors(cls, values):
)
flux = np.loadtxt(flux_file)
flux_file_energies = flux[:,0]/1.e3
energy_range = (flux_file_energies.min(), flux_file_energies.max())
flux_e_min = flux_file_energies.min()
flux_e_max = flux_file_energies.max()
for detector in detectors:
mca_bin_energies = np.linspace(
0, detector.max_energy_kev, detector.num_bins)
e_min = index_nearest_upp(mca_bin_energies, energy_range[0])
e_max = index_nearest_down(mca_bin_energies, energy_range[1])
for i, (min_, max_) in enumerate(
deepcopy(detector.include_bin_ranges)):
if min_ < e_min or max_ > e_max:
bin_range = [max(min_, e_min), min(max_, e_max)]
print(f'WARNING: include_bin_ranges[{i}] out of range '
f'({detector.include_bin_ranges[i]}): adjusted '
f'to {bin_range}')
detector.include_bin_ranges[i] = bin_range
for i, (det_e_min, det_e_max) in enumerate(
deepcopy(detector.include_energy_ranges)):
if det_e_min < flux_e_min or det_e_max > flux_e_max:
energy_range = [min(det_e_min, flux_e_min),
max(det_e_max, flux_e_max)]
print(
f'WARNING: include_energy_ranges[{i}] out of range'
f' ({detector.include_energy_ranges[i]}): adjusted'
f' to {energy_range}')
detector.include_energy_ranges[i] = energy_range

return values

Expand Down Expand Up @@ -381,6 +409,8 @@ class MCAElementCalibrationConfig(MCAElementConfig):
:ivar hkl_indices: List of unique HKL indices to fit peaks for in
the calibration routine, defaults to `[]`.
:type hkl_indices: list[int], optional
:ivar background: Background model for peak fitting.
:type background: str, list[str], optional
:ivar tth_initial_guess: Initial guess for 2&theta,
defaults to `5.0`.
:type tth_initial_guess: float, optional
Expand All @@ -394,7 +424,7 @@ class MCAElementCalibrationConfig(MCAElementConfig):
:type tth_calibrated: float, optional
:ivar slope_calibrated: Calibrated value for detector channel
energy correction linear slope.
:type slope_calibrated: float, optional
:type slope_calibrated: float, optional
:ivar intercept_calibrated: Calibrated value for detector channel
energy correction y-intercept.
:type intercept_calibrated: float, optional
Expand All @@ -403,6 +433,7 @@ class MCAElementCalibrationConfig(MCAElementConfig):
tth_max: confloat(gt=0, allow_inf_nan=False) = 90.0
hkl_tth_tol: confloat(gt=0, allow_inf_nan=False) = 0.15
hkl_indices: Optional[conlist(item_type=conint(ge=0), min_items=1)] = []
background: Optional[Union[str, list]]
tth_initial_guess: confloat(gt=0, le=tth_max, allow_inf_nan=False) = 5.0
slope_initial_guess: float = 1.0
intercept_initial_guess: float = 0.0
Expand Down Expand Up @@ -435,10 +466,19 @@ class MCAElementDiffractionVolumeLengthConfig(MCAElementConfig):
:ivar dvl_measured: Placeholder for the measured diffraction
volume length before writing the data to file.
:type dvl_measured: float, optional
:ivar fit_amplitude: Placeholder for amplitude of the gaussian fit.
:type fit_amplitude: float, optional
:ivar fit_center: Placeholder for center of the gaussian fit.
:type fit_center: float, optional
:ivar fit_sigma: Placeholder for sigma of the gaussian fit.
:type fit_sigma: float, optional
"""
measurement_mode: Optional[Literal['manual', 'auto']] = 'auto'
sigma_to_dvl_factor: Optional[Literal[3.5, 2.0, 4.0]] = 3.5
dvl_measured: Optional[confloat(gt=0)] = None
fit_amplitude: Optional[float] = None
fit_center: Optional[float] = None
fit_sigma: Optional[float] = None

def dict(self, *args, **kwargs):
"""Return a representation of this configuration in a
Expand All @@ -452,6 +492,8 @@ def dict(self, *args, **kwargs):
d = super().dict(*args, **kwargs)
if self.measurement_mode == 'manual':
del d['sigma_to_dvl_factor']
for param in ('amplitude', 'center', 'sigma'):
d[f'fit_{param}'] = float(d[f'fit_{param}'])
return d


Expand All @@ -460,10 +502,15 @@ class DiffractionVolumeLengthConfig(MCAScanDataConfig):
volume length calculation for an EDD setup using a steel-foil
raster scan.
:ivar sample_thickness: Thickness of scanned foil sample. Quantity
must be provided in the same units as the values of the
scanning motor.
:type sample_thickness: float
:ivar detectors: Individual detector element DVL
measurement configurations
:type detectors: list[MCAElementDiffractionVolumeLengthConfig]
"""
sample_thickness: float
detectors: conlist(min_items=1,
item_type=MCAElementDiffractionVolumeLengthConfig)

Expand All @@ -481,18 +528,6 @@ def scanned_vals(self):
scan_numbers=self._parfile.good_scan_numbers())
return self.scanparser.spec_scan_motor_vals[0]

@property
def scanned_dim_lbl(self):
"""Return a label for plot axes corresponding to the scanned
dimension.
:return: Name of scanned motor.
:rtype: str
"""
if self._parfile is not None:
return self.scan_column
return self.scanparser.spec_scan_motor_mnes[0]


class CeriaConfig(MaterialConfig):
"""Model for the sample material used in calibrations.
Expand Down Expand Up @@ -629,6 +664,8 @@ class MCAElementStrainAnalysisConfig(MCAElementConfig):
:type hkl_indices: list[int], optional
:ivar background: Background model for peak fitting.
:type background: str, list[str], optional
:ivar num_proc: Number of processors used for peak fitting.
:type num_proc: int, optional
:ivar peak_models: Peak model for peak fitting,
defaults to `'gaussian'`.
:type peak_models: Literal['gaussian', 'lorentzian']],
Expand Down Expand Up @@ -656,7 +693,8 @@ class MCAElementStrainAnalysisConfig(MCAElementConfig):
tth_max: confloat(gt=0, allow_inf_nan=False) = 90.0
hkl_tth_tol: confloat(gt=0, allow_inf_nan=False) = 0.15
hkl_indices: Optional[conlist(item_type=conint(ge=0), min_items=1)] = []
background: Optional[str]
background: Optional[Union[str, list]]
num_proc: Optional[conint(gt=0)] = os.cpu_count()
peak_models: Union[
conlist(item_type=Literal['gaussian', 'lorentzian'], min_items=1),
Literal['gaussian', 'lorentzian']] = 'gaussian'
Expand All @@ -667,6 +705,13 @@ class MCAElementStrainAnalysisConfig(MCAElementConfig):
tth_calibrated: Optional[confloat(gt=0, allow_inf_nan=False)]
slope_calibrated: Optional[confloat(allow_inf_nan=False)]
intercept_calibrated: Optional[confloat(allow_inf_nan=False)]
calibration_bin_ranges: Optional[
conlist(
min_items=1,
item_type=conlist(
item_type=conint(ge=0),
min_items=2,
max_items=2))]
tth_file: Optional[FilePath]
tth_map: Optional[np.ndarray] = None

Expand Down Expand Up @@ -696,6 +741,7 @@ def add_calibration(self, calibration):
'intercept_calibrated', 'num_bins', 'max_energy_kev']
for field in add_fields:
setattr(self, field, getattr(calibration, field))
self.calibration_bin_ranges = calibration.include_bin_ranges

def get_tth_map(self, map_config):
"""Return a map of 2&theta values to use -- may vary at each
Expand Down
Loading

0 comments on commit 490a730

Please sign in to comment.