Skip to content

Commit

Permalink
add support for accessing 23-element data with SMBMCAScanParser
Browse files Browse the repository at this point in the history
  • Loading branch information
keara-soloway committed Dec 5, 2023
1 parent 9aeb031 commit 5d98d9a
Showing 1 changed file with 240 additions and 39 deletions.
279 changes: 240 additions & 39 deletions CHAP/utils/scanparsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,15 @@ class SMBLinearScanParser(LinearScanParser, SMBScanParser):
"""

def get_spec_scan_dwell(self):
if self.spec_macro in ('flymesh', 'flyscan', 'ascan'):
if self.spec_macro in ('flymesh', 'mesh'):
try:
# Try post-summer-2022 format
dwell = float(self.spec_args[4])
except:
# Accommodate pre-summer-2022 format
dwell = float(self.spec_args[8])
return dwell
if self.spec_macro in ('flyscan', 'ascan'):
return float(self.spec_args[4])
if self.spec_macro == 'tseries':
return float(self.spec_args[1])
Expand All @@ -680,8 +688,18 @@ def get_spec_scan_dwell(self):
f'for scans of type {self.spec_macro}')

def get_spec_scan_motor_mnes(self):
if self.spec_macro == 'flymesh':
return (self.spec_args[0], self.spec_args[5])
if self.spec_macro ('flymesh', 'mesh'):
m1_mne = self.spec_args[0]
try:
# Try post-summer-2022 format
dwell = float(self.spec_args[4])
except:
# Accommodate pre-summer-2022 format
m2_mne_i = 4
else:
m2_mne_i = 5
m2_mne = self.spec_args[m2_mne_i]
return (m1_mne, m2_mne)
if self.spec_macro in ('flyscan', 'ascan'):
return (self.spec_args[0],)
if self.spec_macro in ('tseries', 'loopscan'):
Expand All @@ -690,13 +708,27 @@ def get_spec_scan_motor_mnes(self):
f'for scans of type {self.spec_macro}')

def get_spec_scan_motor_vals(self):
if self.spec_macro == 'flymesh':
fast_mot_vals = np.linspace(float(self.spec_args[1]),
float(self.spec_args[2]),
int(self.spec_args[3])+1)
slow_mot_vals = np.linspace(float(self.spec_args[6]),
float(self.spec_args[7]),
int(self.spec_args[8])+1)
if self.spec_macro in ('flymesh', 'mesh'):
m1_start = float(self.spec_args[1])
m1_end = float(self.spec_args[2])
m1_npt = int(self.spec_args[3]) + 1
try:
# Try post-summer-2022 format
dwell = float(self.spec_args[4])
except:
# Accommodate pre-summer-2022 format
m2_start_i = 5
m2_end_i = 6
m2_nint_i = 7
else:
m2_start_i = 6
m2_end_i = 7
m2_nint_i = 8
m2_start = float(self.spec_args[m2_start_i])
m2_end = float(self.spec_args[m2_end_i])
m2_npt = int(self.spec_args[m2_nint_i]) + 1
fast_mot_vals = np.linspace(m1_start, m1_end, m1_npt)
slow_mot_vals = np.linspace(m2_start, m2_end, m2_npt)
return (fast_mot_vals, slow_mot_vals)
if self.spec_macro in ('flyscan', 'ascan'):
mot_vals = np.linspace(float(self.spec_args[1]),
Expand All @@ -709,9 +741,17 @@ def get_spec_scan_motor_vals(self):
f'for scans of type {self.spec_macro}')

def get_spec_scan_shape(self):
if self.spec_macro == 'flymesh':
fast_mot_npts = int(self.spec_args[3])+1
slow_mot_npts = int(self.spec_args[8])+1
if self.spec_macro in ('flymesh', 'mesh'):
fast_mot_npts = int(self.spec_args[3]) + 1
try:
# Try post-summer-2022 format
dwell = float(self.spec_args[4])
except:
# Accommodate pre-summer-2022 format
m2_nint_i = 7
else:
m2_nint_i = 8
slow_mot_npts = int(self.spec_args[m2_nint_i]) + 1
return (fast_mot_npts, slow_mot_npts)
if self.spec_macro in ('flyscan', 'ascan'):
mot_npts = int(self.spec_args[3])+1
Expand All @@ -721,14 +761,6 @@ def get_spec_scan_shape(self):
raise RuntimeError(f'{self.scan_title}: cannot determine scan shape '
f'for scans of type {self.spec_macro}')

def get_spec_scan_dwell(self):
if self.spec_macro == 'flymesh':
return float(self.spec_args[4])
if self.spec_macro in ('flyscan', 'ascan'):
return float(self.spec_args[-1])
raise RuntimeError(f'{self.scan_title}: cannot determine dwell time '
f'for scans of type {self.spec_macro}')

def get_detector_data_path(self):
return os.path.join(self.scan_path, str(self.scan_number))

Expand Down Expand Up @@ -1057,10 +1089,68 @@ class SMBMCAScanParser(MCAScanParser, SMBLinearScanParser):
"""Concrete implementation of a class representing a scan taken
with the typical EDD setup at SMB or FAST.
"""
detector_data_formats = ('spec', 'h5')
def __init__(self, spec_file_name, scan_number, detector_data_format=None):
"""Constructor for SMBMCAScnaParser.
:param spec_file: Path to scan's SPEC file
:type spec_file: str
:param scan_number: Number of the SPEC scan
:type scan_number: int
:param detector_data_format: Format of the MCA data collected,
defaults to None
:type detector_data_format: Optional[Literal["spec", "h5"]]
"""
super().__init__(spec_file_name, scan_number)

def get_detector_num_bins(self, detector_prefix):
with open(self.get_detector_data_file(detector_prefix)) \
as detector_file:
self.detector_data_format = None
if detector_data_format is None:
self.init_detector_data_format()
else:
if detector_data_format.lower() in self.detector_data_formats:
self.detector_data_format = detector_data_format.lower()
else:
raise ValueError(
'Unrecognized value for detector_data_format: '
+ f'{detector_data_format}. Allowed values are: '
+ ', '.join(self.detector_data_formats))
print()


def init_detector_data_format(self):
"""Determine and set a value for the instance variable
`detector_data_format` based on the presence / absence of
detector data files of different formats conventionally
associated with this scan. Also set the corresponding
appropriate value for `_detector_data_path`.
"""
try:
self._detector_data_path = self.scan_path
detector_file = self.get_detector_data_file_spec()
except OSError:
try:
self._detector_data_path = os.path.join(
self.scan_path, str(self.scan_number), 'edd')
detector_file = self.get_detector_data_file_h5()
except OSError:
raise RuntimeError(
f"{self.scan_title}: Can't determine detector data format")
else:
self.detector_data_format = 'h5'
else:
self.detector_data_format = 'spec'

def get_detector_data_path(self):
raise NotImplementedError

def get_detector_num_bins(self, element_index=0):
if self.detector_data_format == 'spec':
return self.get_detector_num_bins_spec()
elif self.detector_data_format == 'h5':
return self.get_detector_num_bins_h5(element_index)

def get_detector_num_bins_spec(self):
with open(self.get_detector_data_file_spec()) as detector_file:
lines = detector_file.readlines()
for line in lines:
if line.startswith('#@CHANN'):
Expand All @@ -1070,33 +1160,101 @@ def get_detector_num_bins(self, detector_prefix):
return int(number_saved)
except:
continue
raise RuntimeError(f'{self.scan_title}: could not find num_bins for '
f'detector {detector_prefix}')

def get_detector_data_path(self):
return self.scan_path
raise RuntimeError(f'{self.scan_title}: could not find num_bins')

def get_detector_data_file(self, detector_prefix, scan_step_index=0):
def get_detector_num_bins_h5(self, element_index):
from h5py import File
detector_file = self.get_detector_data_file_h5()
with File(detector_file) as h5_file:
dset_shape = h5_file['/entry/data/data'].shape
return dset_shape[-1]

def get_detector_data_file(self, scan_step_index=0):
if self.detector_data_format == 'spec':
return self.get_detector_data_file_spec()
elif self.detector_data_format == 'h5':
return self.get_detector_data_file_h5(
scan_step_index=scan_step_index)

def get_detector_data_file_spec(self):
"""Return the filename (full absolute path) to the file
containing spec-formatted MCA data for this scan.
"""
file_name = f'spec.log.scan{self.scan_number}.mca1.mca'
file_name_full = os.path.join(self.detector_data_path, file_name)
if os.path.isfile(file_name_full):
return file_name_full
raise RuntimeError(
f'{self.scan_title}: could not find detector image file')
raise OSError(
'{self.scan_title}: could not find detector image file'
)

def get_detector_data_file_h5(self, scan_step_index=0):
"""Return the filename (full absolute path) to the file
containing h5-formatted MCA data for this scan.
:param scan_step_index:
"""
scan_step = self.get_scan_step(scan_step_index)
if len(self.spec_scan_shape) == 1:
filename_index = 0
elif len(self.spec_scan_shape) == 2:
scan_step = self.get_scan_step(scan_step_index)
filename_index = scan_step[0]
else:
raise NotImplementedError(
'Cannot find detector file for scans with dimension > 2')
file_name = list_smb_mca_detector_files_h5(
self.detector_data_path)[filename_index]
file_name_full = os.path.join(self.detector_data_path, file_name)
if os.path.isfile(file_name_full):
return file_name_full
raise OSError(
'{self.scan_title}: could not find detector image file'
)


def get_all_detector_data(self, detector):
"""Return a 2D array of all MCA spectra collected in this scan
by the detector element indicated with `detector`.
def get_all_detector_data(self, detector_prefix):
:param detector: For detector data collected in SPEC format,
this is the detector prefix as it appears in the spec MCA
data file. For detector data collected in H5 format, this
is the index of a particular detector element.
:type detector: Union[str, int]
:rtype: numpy.ndarray
"""
if self.detector_data_format == 'spec':
return self.get_all_detector_data_spec(detector)
elif self.detector_data_format == 'h5':
try:
element_index = int(detector)
except:
raise TypeError(f'{detector} is not an integer element index')
return self.get_all_detector_data_h5(element_index)

def get_all_detector_data_spec(self, detector_prefix):
"""Return a 2D array of all MCA spectra collected by a
detector in the spec MCA file format during the scan.
:param detector_prefix: Detector name at is appears in the
spec MCA file.
:type detector_prefix: str
:returns: 2D array of MCA spectra
:rtype: numpy.ndarray
"""
# This should be easy with pyspec, but there are bugs in
# pyspec for MCA data..... or is the 'bug' from a nonstandard
# implementation of some macro on our end? According to spec
# manual and pyspec code, mca data should always begin w/ '@A'
# In example scans, it begins with '@mca1' instead
# In example scans, it begins with '@{detector_prefix}'
# instead
data = []

with open(self.get_detector_data_file(detector_prefix)) \
as detector_file:
with open(self.get_detector_data_file_spec()) as detector_file:
lines = [line.strip("\\\n") for line in detector_file.readlines()]

num_bins = self.get_detector_num_bins(detector_prefix)
num_bins = self.get_detector_num_bins()

counter = 0
for line in lines:
Expand All @@ -1123,6 +1281,49 @@ def get_all_detector_data(self, detector_prefix):

return np.array(data)

def get_detector_data(self, detector_prefix, scan_step_index:int):
detector_data = self.get_all_detector_data(detector_prefix)
def get_all_detector_data_h5(self, element_index):
"""Return a 2D array of all MCA spectra collected by a
detector in the h5 file format during the scan.
:param element_index: The index of a particualr MCA element to
return data for.
:type element_index: int
:returns: 2D array of MCA spectra
:rtype: numpy.ndarray
"""
from h5py import File
detector_file = self.get_detector_data_file_h5()
with File(detector_file) as h5_file:
# NB: Bug in EPICS IOC captures an extra frame of data at
# the start.
detector_data = h5_file['/entry/data/data'][1:,element_index,:]
return detector_data

def get_detector_data(self, detector, scan_step_index:int):
"""Return a single MCA spectrum for the detector indicated.
:param detector: If this scan collected MCA data in "spec"
format, this is the detector prefix as it appears in the
spec MCA data file. If this scan collected data in .h5
format, this is the index of the detector element of
interest.:type detector: typing.Union[str, int]
:param scan_step_index: Index of the scan step to return the
spectrum from.
:type scan_step_index: int
:returns: A single MCA spectrum
:rtype: numpy.ndarray
"""
detector_data = self.get_all_detector_data(detector)
return detector_data[scan_step_index]

@cache
def list_smb_mca_detector_files_h5(detector_data_path):
"""Return a sorted list of all *.hdf5 files in a directory
:param detector_data_path: Directory to return *.hdf5 files from
:type detector_data_path: str
:returns: Sorted list of detector data filenames
:rtype: list[str]
"""
return sorted(
[f for f in os.listdir(detector_data_path) if f.endswith('.hdf5')])

0 comments on commit 5d98d9a

Please sign in to comment.