Skip to content

Commit

Permalink
add: initial implementation of edd.EddMapReader
Browse files Browse the repository at this point in the history
edd.EddMapReader is a shortcut to automatically generating a valid
common.models.map.MapConfig dictionary from just an EDD-style par file
(post-2023 format) and datset_id.
  • Loading branch information
keara-soloway committed Feb 6, 2024
1 parent 01041dc commit 5b5a70f
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHAP/edd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""This subpackage contains `PipelineItems` unique to EDD data
processing workflows.
"""
# from CHAP.edd.reader import
from CHAP.edd.reader import EddMapReader
from CHAP.edd.processor import (DiffractionVolumeLengthProcessor,
LatticeParameterRefinementProcessor,
MCACeriaCalibrationProcessor,
Expand Down
161 changes: 161 additions & 0 deletions CHAP/edd/reader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,165 @@
#!/usr/bin/env python
from CHAP.reader import Reader


class EddMapReader(Reader):
"""Reader for taking an EDD-style .par file and returning a
`MapConfig` representing one of the datasets in the
file. Independent dimensions are determined automatically, and a
specific set of items to use for extra scalar datasets to include
are hard-coded in."""
def read(self, parfile, dataset_id):
"""Return a validated `MapConfig` object representing an EDD
dataset.
:param parfile: Name of the EDD-style .par file containing the
dataset.
:type parfile: str
:param dataset_id: Number of the dataset in the .par file
to return as a map.
:type dataset_id: int
:returns: Map configuration packaged with the appropriate
value for 'schema'.
:rtype: PipelineData
"""
import numpy as np
from CHAP.common.models.map import MapConfig
from CHAP.pipeline import PipelineData
from CHAP.utils.parfile import ParFile
from CHAP.utils.scanparsers import SMBMCAScanParser as ScanParser

parfile = ParFile(parfile)

# Get list of scan numbers for the dataset
dataset_ids = np.asarray(parfile.get_values('dataset_id'))
dataset_rows_i = np.argwhere(
np.where(
np.asarray(dataset_ids) == dataset_id, 1, 0)).flatten()
scan_nos = [parfile.data[i][parfile.scann_i] for i in dataset_rows_i\
if parfile.data[i][parfile.scann_i] in \
parfile.good_scan_numbers()]
self.logger.debug(f'Scan numbers: {scan_nos}')
spec_scans = [dict(spec_file=parfile.spec_file, scan_numbers=scan_nos)]

# Get scan type for this dataset
scan_types = parfile.get_values('scan_type', scan_numbers=scan_nos)
if any([st != scan_types[0] for st in scan_types]):
msg = 'Only one scan type per dataset is suported.'
self.logger.error(msg)
raise Exception(msg)
scan_type = scan_types[0]
self.logger.debug(f'Scan type: {scan_type}')

# Based on scan type, get independent_dimensions for the map
# Start by adding labx, laby, labz, and omega. Any "extra"
# dimensions will be sqeezed out of the map later.
independent_dimensions = [
dict(label='labx', units='mm', data_type='smb_par',
name='labx'),
dict(label='laby', units='mm', data_type='smb_par',
name='laby'),
dict(label='labz', units='mm', data_type='smb_par',
name='labz'),
dict(label='ometotal', units='degrees', data_type='smb_par',
name='ometotal')
]
scalar_data = []
if scan_type != 0:
self.logger.warning(
'Assuming all fly axes parameters are identical for all scans')
axes_labels = {1: 'fly_labx', 2: 'fly_laby', 3: 'fly_labz',
4: 'fly_ometotal'}
axes_units = {1: 'mm', 2: 'mm', 3: 'mm', 4: 'degrees'}
axes_added = []
scanparser = ScanParser(parfile.spec_file, scan_nos[0])
def add_fly_axis(fly_axis_index):
if fly_axis_index in axes_added:
return
fly_axis_key = scanparser.pars[f'fly_axis{fly_axis_index}']
independent_dimensions.append(dict(
label=axes_labels[fly_axis_key],
data_type='spec_motor',
units=axes_units[fly_axis_key],
name=scanparser.spec_scan_motor_mnes[fly_axis_index]))
axes_added.append(fly_axis_index)
add_fly_axis(0)
if scan_type in (2, 3, 5):
add_fly_axis(1)
if scan_type in (4, 5):
scalar_data.append(dict(
label='bin_axis', units='n/a', data_type='smb_par',
name='bin_axis'))

# Add in the usual extra scalar data maps for EDD
scalar_data.extend([
dict(label='dataset_id', units='n/a', data_type='smb_par',
name='dataset_id'),
dict(label='config_id', units='n/a', data_type='smb_par',
name='config_id'),
dict(label='SCAN_N', units='n/a', data_type='smb_par',
name='SCAN_N'),
dict(label='rsgap_size', units='mm', data_type='smb_par',
name='rsgap_size'),
dict(label='x_effective', units='mm', data_type='smb_par',
name='x_effective'),
dict(label='z_effective', units='mm', data_type='smb_par',
name='z_effective'),
dict(label='scan_type', units='n/a', data_type='smb_par',
name='scan_type')
])

# Construct initial map config dictionary
scanparser = ScanParser(parfile.spec_file, scan_nos[0])
map_config_dict = dict(
title=scanparser.scan_name,
station='id1a3',
experiment_type='EDD',
sample=dict(name=scanparser.scan_name),
spec_scans=[
dict(spec_file=parfile.spec_file, scan_numbers=scan_nos)],
independent_dimensions=independent_dimensions,
scalar_data=scalar_data,
presample_intensity=dict(name='a3ic1', data_type='scan_column'),
postsample_intensity=dict(name='diode', data_type='scan_column'),
dwell_time_actual=dict(name='count_time', data_type='smb_par')
)
map_config = MapConfig(**map_config_dict)

# Squeeze out extraneous independent dimensions (dimensions
# along which data were taken at only one unique coordinate
# value)
while 1 in map_config.shape:
remove_dim_index = map_config.shape[::-1].index(1)
self.logger.debug(
'Map dimensions: '
+ str([dim["label"] for dim in independent_dimensions]))
self.logger.debug(f'Map shape: {map_config.shape}')
self.logger.debug(
'Sqeezing out independent dimension '
+ independent_dimensions[remove_dim_index]['label'])
independent_dimensions.pop(remove_dim_index)
map_config = MapConfig(**map_config_dict)
self.logger.debug(
'Map dimensions: '
+ str([dim["label"] for dim in independent_dimensions]))
self.logger.debug(f'Map shape: {map_config.shape}')

# Add lab coordinates to the map's scalar_data only if they
# are NOT already one of the sqeezed map's
# independent_dimensions.
lab_dims = [
dict(label='labx', units='mm', data_type='smb_par', name='labx'),
dict(label='laby', units='mm', data_type='smb_par', name='laby'),
dict(label='labz', units='mm', data_type='smb_par', name='labz'),
dict(label='ometotal', units='degrees', data_type='smb_par',
name='ometotal')
]
for dim in lab_dims:
if dim not in independent_dimensions:
scalar_data.append(dim)

return map_config_dict


if __name__ == '__main__':
from CHAP.reader import main
Expand Down

0 comments on commit 5b5a70f

Please sign in to comment.