-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #161 from dvm-shlee/main
Enhancing Modular Architecture and Debugging in BrkRaw: Introduction of New API and App Modules
- Loading branch information
Showing
38 changed files
with
2,748 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,4 +11,7 @@ build | |
*.egg-info | ||
*.egg-info/* | ||
.DS_Store | ||
tests/tutorials | ||
tests/tutorials | ||
_test*.py | ||
_*.ipynb | ||
_*.log |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .brkobj import StudyObj | ||
from ..config import ConfigManager | ||
|
||
__all__ = [StudyObj, ConfigManager] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from .base import BaseAnalyzer | ||
from .scaninfo import ScanInfoAnalyzer | ||
from .affine import AffineAnalyzer | ||
from .dataarray import DataArrayAnalyzer | ||
|
||
__all__ = [BaseAnalyzer, ScanInfoAnalyzer, AffineAnalyzer, DataArrayAnalyzer] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
from __future__ import annotations | ||
from brkraw.api import helper | ||
from .base import BaseAnalyzer | ||
import numpy as np | ||
from copy import copy | ||
from typing import TYPE_CHECKING | ||
if TYPE_CHECKING: | ||
from ..brkobj.scan import ScanInfo | ||
|
||
|
||
SLICEORIENT = { | ||
0: 'sagital', | ||
1: 'coronal', | ||
2: 'axial' | ||
} | ||
|
||
SUBJTYPE = ['Biped', 'Quadruped', 'Phantom', 'Other', 'OtherAnimal'] | ||
SUBJPOSE = { | ||
'part': ['Head', 'Foot', 'Tail'], | ||
'side': ['Supine', 'Prone', 'Left', 'Right'] | ||
} | ||
|
||
|
||
class AffineAnalyzer(BaseAnalyzer): | ||
def __init__(self, infoobj: 'ScanInfo'): | ||
infoobj = copy(infoobj) | ||
if infoobj.image['dim'] == 2: | ||
xr, yr = infoobj.image['resolution'] | ||
self.resolution = [(xr, yr, zr) for zr in infoobj.slicepack['slice_distances_each_pack']] | ||
elif infoobj.image['dim'] == 3: | ||
self.resolution = [infoobj.image['resolution'][:]] | ||
else: | ||
raise NotImplementedError | ||
if infoobj.slicepack['num_slice_packs'] > 1: | ||
self.affine = [ | ||
self._calculate_affine(infoobj, slicepack_id) | ||
for slicepack_id in range(infoobj.slicepack['num_slice_packs']) | ||
] | ||
else: | ||
self.affine = self._calculate_affine(infoobj) | ||
|
||
self.subj_type = infoobj.orientation['subject_type'] if hasattr(infoobj, 'orientation') else None | ||
self.subj_position = infoobj.orientation['subject_position'] if hasattr(infoobj, 'orientation') else None | ||
|
||
def get_affine(self, subj_type:str|None=None, subj_position:str|None=None): | ||
subj_type = subj_type or self.subj_type | ||
subj_position = subj_position or self.subj_position | ||
if isinstance(self.affine, list): | ||
affine = [self._correct_orientation(aff, subj_position, subj_type) for aff in self.affine] | ||
elif isinstance(self.affine, np.ndarray): | ||
affine = self._correct_orientation(self.affine, subj_position, subj_type) | ||
return affine | ||
|
||
def _calculate_affine(self, infoobj: 'ScanInfo', slicepack_id:int|None = None): | ||
sidx = infoobj.orientation['orientation_desc'][slicepack_id].index(2) \ | ||
if slicepack_id else infoobj.orientation['orientation_desc'].index(2) | ||
slice_orient = SLICEORIENT[sidx] | ||
resol = self.resolution[slicepack_id] \ | ||
if slicepack_id else self.resolution[0] | ||
orientation = infoobj.orientation['orientation'][slicepack_id] \ | ||
if slicepack_id else infoobj.orientation['orientation'] | ||
volume_origin = infoobj.orientation['volume_origin'][slicepack_id] \ | ||
if slicepack_id else infoobj.orientation['volume_origin'] | ||
if infoobj.slicepack['reverse_slice_order']: | ||
slice_distance = infoobj.slicepack['slice_distances_each_pack'][slicepack_id] \ | ||
if slicepack_id else infoobj.slicepack['slice_distances_each_pack'] | ||
volume_origin = self._correct_origin(orientation, volume_origin, slice_distance) | ||
return self._compose_affine(resol, orientation, volume_origin, slice_orient) | ||
|
||
@staticmethod | ||
def _correct_origin(orientation, volume_origin, slice_distance): | ||
new_origin = orientation.dot(volume_origin) | ||
new_origin[-1] += slice_distance | ||
return orientation.T.dot(new_origin) | ||
|
||
@staticmethod | ||
def _compose_affine(resolution, orientation, volume_origin, slice_orient): | ||
resol = np.array(resolution) | ||
if slice_orient in ['axial', 'sagital']: | ||
resol = np.diag(resol) | ||
else: | ||
resol = np.diag(resol * np.array([1, 1, -1])) | ||
|
||
rmat = orientation.T.dot(resol) | ||
return helper.from_matvec(rmat, volume_origin) | ||
|
||
@staticmethod | ||
def _est_rotate_angle(subj_pose): | ||
rotate_angle = {'rad_x':0, 'rad_y':0, 'rad_z':0} | ||
if subj_pose: | ||
if subj_pose == 'Head_Supine': | ||
rotate_angle['rad_z'] = np.pi | ||
elif subj_pose == 'Head_Prone': | ||
pass | ||
elif subj_pose == 'Head_Left': | ||
rotate_angle['rad_z'] = np.pi/2 | ||
elif subj_pose == 'Head_Right': | ||
rotate_angle['rad_z'] = -np.pi/2 | ||
elif subj_pose in ['Foot_Supine', 'Tail_Supine']: | ||
rotate_angle['rad_x'] = np.pi | ||
elif subj_pose in ['Foot_Prone', 'Tail_Prone']: | ||
rotate_angle['rad_y'] = np.pi | ||
elif subj_pose in ['Foot_Left', 'Tail_Left']: | ||
rotate_angle['rad_y'] = np.pi | ||
rotate_angle['rad_z'] = -np.pi/2 | ||
elif subj_pose in ['Foot_Right', 'Tail_Right']: | ||
rotate_angle['rad_y'] = np.pi | ||
rotate_angle['rad_z'] = np.pi/2 | ||
else: | ||
raise NotImplementedError | ||
return rotate_angle | ||
|
||
@classmethod | ||
def _correct_orientation(cls, affine, subj_pose, subj_type): | ||
cls._inspect_subj_info(subj_pose, subj_type) | ||
rotate_angle = cls._est_rotate_angle(subj_pose) | ||
affine = helper.rotate_affine(affine, **rotate_angle) | ||
|
||
if subj_type != 'Biped': | ||
affine = helper.rotate_affine(affine, rad_x=-np.pi/2, rad_y=np.pi) | ||
return affine | ||
|
||
@staticmethod | ||
def _inspect_subj_info(subj_pose, subj_type): | ||
if subj_pose: | ||
part, side = subj_pose.split('_') | ||
assert part in SUBJPOSE['part'], 'Invalid subject position' | ||
assert side in SUBJPOSE['side'], 'Invalid subject position' | ||
if subj_type: | ||
assert subj_type in SUBJTYPE, 'Invalid subject type' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
class BaseAnalyzer: | ||
def vars(self): | ||
return self.__dict__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from __future__ import annotations | ||
from .base import BaseAnalyzer | ||
import numpy as np | ||
from copy import copy | ||
from typing import TYPE_CHECKING | ||
if TYPE_CHECKING: | ||
from ..brkobj import ScanInfo | ||
from io import BufferedReader | ||
from zipfile import ZipExtFile | ||
|
||
|
||
class DataArrayAnalyzer(BaseAnalyzer): | ||
def __init__(self, infoobj: 'ScanInfo', fileobj: BufferedReader|ZipExtFile): | ||
infoobj = copy(infoobj) | ||
self._parse_info(infoobj) | ||
self.buffer = fileobj | ||
|
||
def _parse_info(self, infoobj: 'ScanInfo'): | ||
if not hasattr(infoobj, 'dataarray'): | ||
raise AttributeError | ||
self.slope = infoobj.dataarray['2dseq_slope'] | ||
self.offset = infoobj.dataarray['2dseq_offset'] | ||
self.dtype = infoobj.dataarray['2dseq_dtype'] | ||
self.shape = infoobj.image['shape'][:] | ||
self.shape_desc = infoobj.image['dim_desc'][:] | ||
if infoobj.frame_group and infoobj.frame_group['type']: | ||
self._calc_array_shape(infoobj) | ||
|
||
def _calc_array_shape(self, infoobj: 'ScanInfo'): | ||
self.shape.extend(infoobj.frame_group['shape'][:]) | ||
self.shape_desc.extend([fgid.replace('FG_', '').lower() for fgid in infoobj.frame_group['id']]) | ||
|
||
def get_dataarray(self): | ||
self.buffer.seek(0) | ||
return np.frombuffer(self.buffer.read(), self.dtype).reshape(self.shape, order='F') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from __future__ import annotations | ||
from collections import OrderedDict | ||
from brkraw.api import helper | ||
from .base import BaseAnalyzer | ||
from typing import TYPE_CHECKING | ||
if TYPE_CHECKING: | ||
from ..pvobj import PvScan, PvReco, PvFiles | ||
|
||
|
||
class ScanInfoAnalyzer(BaseAnalyzer): | ||
"""Helps parse metadata from multiple parameter files to make it more human-readable. | ||
Args: | ||
pvobj (PvScan): The PvScan object containing acquisition and method parameters. | ||
reco_id (int, optional): The reconstruction ID. Defaults to None. | ||
Raises: | ||
NotImplementedError: If an operation is not implemented. | ||
""" | ||
def __init__(self, | ||
pvobj: 'PvScan'|'PvReco'|'PvFiles', | ||
reco_id:int|None = None, | ||
debug:bool = False): | ||
|
||
self._set_pars(pvobj, reco_id) | ||
if not debug: | ||
self.info_protocol = helper.Protocol(self).get_info() | ||
if self.visu_pars: | ||
self._parse_info() | ||
|
||
def _set_pars(self, pvobj: 'PvScan'|'PvReco'|'PvFiles', reco_id: int|None): | ||
for p in ['acqp', 'method']: | ||
try: | ||
vals = getattr(pvobj, p) | ||
except AttributeError: | ||
vals = OrderedDict() | ||
setattr(self, p, vals) | ||
try: | ||
visu_pars = pvobj.get_visu_pars(reco_id) | ||
except FileNotFoundError: | ||
visu_pars = OrderedDict() | ||
setattr(self, 'visu_pars', visu_pars) | ||
|
||
def _parse_info(self): | ||
self.info_dataarray = helper.DataArray(self).get_info() | ||
self.info_frame_group = helper.FrameGroup(self).get_info() | ||
self.info_image = helper.Image(self).get_info() | ||
self.info_slicepack = helper.SlicePack(self).get_info() | ||
self.info_cycle = helper.Cycle(self).get_info() | ||
if self.info_image['dim'] > 1: | ||
self.info_orientation = helper.Orientation(self).get_info() | ||
|
||
def __dir__(self): | ||
return [attr for attr in self.__dict__.keys() if 'info_' in attr] | ||
|
||
def get(self, key): | ||
return getattr(self, key) if key in self.__dir__() else None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from .study import StudyObj | ||
from .scan import ScanObj, ScanInfo | ||
|
||
__all__ = [StudyObj, ScanObj, ScanInfo] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
from __future__ import annotations | ||
import ctypes | ||
from ..pvobj import PvScan | ||
from ..analyzer import ScanInfoAnalyzer, AffineAnalyzer, DataArrayAnalyzer, BaseAnalyzer | ||
|
||
|
||
class ScanInfo(BaseAnalyzer): | ||
def __init__(self): | ||
self.warns = [] | ||
|
||
@property | ||
def num_warns(self): | ||
return len(self.warns) | ||
|
||
|
||
class ScanObj(PvScan): | ||
def __init__(self, pvscan: 'PvScan', reco_id: int|None = None, | ||
loader_address: int|None = None, debug: bool=False): | ||
super().__init__(pvscan._scan_id, | ||
(pvscan._rootpath, pvscan._path), | ||
pvscan._contents, | ||
pvscan._recos) | ||
|
||
self.reco_id = reco_id | ||
self._loader_address = loader_address | ||
self._pvscan_address = id(pvscan) | ||
self.is_debug = debug | ||
self.set_info() | ||
|
||
def set_info(self): | ||
self.info = self.get_info(self.reco_id) | ||
|
||
def get_info(self, reco_id:int, get_analyzer:bool = False): | ||
infoobj = ScanInfo() | ||
pvscan = self.retrieve_pvscan() | ||
analysed = ScanInfoAnalyzer(pvscan, reco_id, self.is_debug) | ||
|
||
if get_analyzer: | ||
return analysed | ||
for attr_name in dir(analysed): | ||
if 'info_' in attr_name: | ||
attr_vals = getattr(analysed, attr_name) | ||
setattr(infoobj, attr_name.replace('info_', ''), attr_vals) | ||
if attr_vals and attr_vals['warns']: | ||
infoobj.warns.extend(attr_vals['warns']) | ||
return infoobj | ||
|
||
def get_affine_info(self, reco_id:int|None = None): | ||
if reco_id: | ||
info = self.get_info(reco_id) | ||
else: | ||
info = self.info if hasattr(self, 'info') else self.get_info(self.reco_id) | ||
return AffineAnalyzer(info) | ||
|
||
def get_data_info(self, reco_id: int|None = None): | ||
reco_id = reco_id or self.avail[0] | ||
recoobj = self.get_reco(reco_id) | ||
fileobj = recoobj.get_2dseq() | ||
info = self.info if hasattr(self, 'info') else self.get_info(self.reco_id) | ||
return DataArrayAnalyzer(info, fileobj) | ||
|
||
def get_affine(self, reco_id:int|None = None, | ||
subj_type:str|None = None, subj_position:str|None = None): | ||
return self.get_affine_info(reco_id).get_affine(subj_type, subj_position) | ||
|
||
def get_dataarray(self, reco_id: int|None = None): | ||
return self.get_data_info(reco_id).get_dataarray() | ||
|
||
def retrieve_pvscan(self): | ||
if self._pvscan_address: | ||
return ctypes.cast(self._pvscan_address, ctypes.py_object).value | ||
|
||
def retrieve_loader(self): | ||
if self._loader_address: | ||
return ctypes.cast(self._loader_address, ctypes.py_object).value | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from __future__ import annotations | ||
from typing import Dict | ||
from ..pvobj import PvDataset | ||
from .scan import ScanObj | ||
|
||
class StudyObj(PvDataset): | ||
def __init__(self, path): | ||
super().__init__(path) | ||
self._parse_header() | ||
|
||
def get_scan(self, scan_id, reco_id=None, debug=False): | ||
""" | ||
Get a scan object by scan ID. | ||
""" | ||
pvscan = super().get_scan(scan_id) | ||
return ScanObj(pvscan=pvscan, reco_id=reco_id, | ||
loader_address=id(self), debug=debug) | ||
|
||
def _parse_header(self) -> (Dict | None): | ||
if not self.contents or 'subject' not in self.contents['files']: | ||
self.header = None | ||
return | ||
subj = self.subject | ||
subj_header = getattr(subj, 'header') if subj.is_parameter() else None | ||
if title := subj_header['TITLE'] if subj_header else None: | ||
self.header = {k.replace("SUBJECT_",""):v for k, v in subj.parameters.items() if k.startswith("SUBJECT")} | ||
self.header['sw_version'] = title.split(',')[-1].strip() if 'ParaVision' in title else "ParaVision < 6" | ||
|
||
@property | ||
def avail(self): | ||
return super().avail | ||
|
||
@property | ||
def info(self): | ||
"""output all analyzed information""" | ||
info = {'header': None, | ||
'scans': {}} | ||
if header := self.header: | ||
info['header'] = header | ||
for scan_id in self.avail: | ||
info['scans'][scan_id] = {} | ||
scanobj = self.get_scan(scan_id) | ||
for reco_id in scanobj.avail: | ||
info['scans'][scan_id][reco_id] = scanobj.get_info(reco_id).vars() | ||
return info |
Oops, something went wrong.