Skip to content

Commit

Permalink
1254 fluka output (#691)
Browse files Browse the repository at this point in the history
* Use flair card for Fluka USRBIN

* Update output mock to verify page name

* Fix code style and typos

* Apply suggestions from code review

Co-authored-by: Leszek Grzanka <[email protected]>

* Add mapping function for Fluka USRBIN scoring types

* Make class method

* Update units

* Fix style issues

* Update doc to mute deepsource

* Use axis names similar to ones for shieldhit

* Try fixing deepsource issues

* Remove empty line

* Simplify method

---------

Co-authored-by: Leszek Grzanka <[email protected]>
  • Loading branch information
hendzeld and grzanka authored Nov 26, 2023
1 parent 7919273 commit 3225b5e
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 29 deletions.
143 changes: 116 additions & 27 deletions pymchelper/readers/fluka.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from typing import Optional

import numpy as np

from pymchelper.axis import MeshAxis
from pymchelper.flair.Input import Particle
from pymchelper.page import Page
from pymchelper.readers.common import ReaderFactory, Reader
from pymchelper.flair.Data import Usrbin, UsrTrack, unpackArray, Usrbdx, Resnuclei, Usrxxx
Expand All @@ -14,6 +16,7 @@ class FlukaReaderFactory(ReaderFactory):
"""
Class responsible for discovery of filetype.
"""

def get_reader(self):
"""
Try reading header of Fluka binary file and return a corresponding FlukaReader object
Expand Down Expand Up @@ -54,29 +57,44 @@ def parse_usrbin(self, estimator):
for det_no, detector in enumerate(usr_object.detector):
page = Page(estimator=estimator)
page.title = detector.name

# USRBIN doesn't support differential binning type, only spatial binning is allowed
estimator.x = MeshAxis(n=detector.nx,
min_val=detector.xlow,
max_val=detector.xhigh,
name="X",
name="Position (X)",
unit="cm",
binning=MeshAxis.BinningType.linear)
estimator.y = MeshAxis(n=detector.ny,
min_val=detector.ylow,
max_val=detector.yhigh,
name="Y",
name="Position (Y)",
unit="cm",
binning=MeshAxis.BinningType.linear)
estimator.z = MeshAxis(n=detector.nz,
min_val=detector.zlow,
max_val=detector.zhigh,
name="Z",
name="Position (Z)",
unit="cm",
binning=MeshAxis.BinningType.linear)

page.name = "scorer {}".format(detector.score)
page.unit = ""
# lets check if the detector.score is generalized particle name.
# if that is the case it means we are scoring Fluence for some particle filter
# we do a check by querying Flair DB is the particle name is known
particle_or_scoring_from_id = get_particle_from_db(detector.score)
if particle_or_scoring_from_id:
unit = UsrbinScoring.get_unit_for_scoring(particle_or_scoring_from_id.name)
if unit:
# here we have the case of genuine scorer (like dose, energy, etc.)
page.name = particle_or_scoring_from_id.name
page.unit = unit
else:
# here we have the case of scoring for particles
page.name = f"FLUENCE {particle_or_scoring_from_id.name}"
page.unit = "/cm^2"
else:
# if not present in the database, we returns the scoring id and empty unit
page.name = f"scorer {detector.score}"
page.unit = ""

# unpack detector data
# TODO cross-check if reshaping is needed
Expand All @@ -90,8 +108,8 @@ def parse_usrbin(self, estimator):
return None

def parse_usrbdx(self, estimator):
"""
USRBDX defines a detector for a boundary crossing fluence or current estimator
"""USRBDX defines a detector for a boundary crossing fluence or current estimator.
:param estimator: an Estimator object, will be modified here and filled with data
"""
try:
Expand Down Expand Up @@ -120,20 +138,22 @@ def parse_usrbdx(self, estimator):

# USRBDX doesn't support spatial (XYZ) binning type
# USRBDX provides double differential binning, first axis is kinetic energy (in GeV)
page.diff_axis1 = MeshAxis(n=detector.ne, # number of energy intervals for scoring
min_val=detector.elow, # minimum kinetic energy for scoring (GeV)
max_val=detector.ehigh, # maximum kinetic energy for scoring (GeV)
name="kinetic energy",
unit="GeV",
binning=energy_binning)
page.diff_axis1 = MeshAxis(
n=detector.ne, # number of energy intervals for scoring
min_val=detector.elow, # minimum kinetic energy for scoring (GeV)
max_val=detector.ehigh, # maximum kinetic energy for scoring (GeV)
name="kinetic energy",
unit="GeV",
binning=energy_binning)

# second axis is solid angle (in steradians)
page.diff_axis2 = MeshAxis(n=detector.na, # number of angular bins
min_val=detector.alow, # minimum solid angle for scoring
max_val=detector.ahigh, # maximum solid angle for scoring
name="solid angle",
unit="sr",
binning=angle_binning)
page.diff_axis2 = MeshAxis(
n=detector.na, # number of angular bins
min_val=detector.alow, # minimum solid angle for scoring
max_val=detector.ahigh, # maximum solid angle for scoring
name="solid angle",
unit="sr",
binning=angle_binning)

# detector.fluence corresponds to i2 in WHAT(1) in first card of USBDX
if detector.fluence == 1:
Expand Down Expand Up @@ -173,7 +193,7 @@ def parse_usrtrack(self, estimator):
for det_no, detector in enumerate(usr_object.detector):
page = Page(estimator=estimator)
page.title = detector.name
page.volume = detector.volume # volume of the detector in cm**3
page.volume = detector.volume # volume of the detector in cm**3

# USRTRACK doesn't support spatial (XYZ) binning type
if detector.type == 1:
Expand All @@ -184,12 +204,13 @@ def parse_usrtrack(self, estimator):
return Exception("Invalid binning type")

# USRTRACK provides single differential binning, with diff axis in kinetic energy (in GeV)
page.diff_axis1 = MeshAxis(n=detector.ne, # number of energy intervals for scoring
min_val=detector.elow, # minimum kinetic energy for scoring (GeV)
max_val=detector.ehigh, # maximum kinetic energy for scoring (GeV)
name="kinetic energy",
unit="GeV",
binning=energy_binning)
page.diff_axis1 = MeshAxis(
n=detector.ne, # number of energy intervals for scoring
min_val=detector.elow, # minimum kinetic energy for scoring (GeV)
max_val=detector.ehigh, # maximum kinetic energy for scoring (GeV)
name="kinetic energy",
unit="GeV",
binning=energy_binning)

page.name = "fluence"
page.unit = "cm-2 GeV-1"
Expand Down Expand Up @@ -268,3 +289,71 @@ def read_data(self, estimator, nscale=1):

estimator.file_format = 'fluka_binary'
return True


def get_particle_from_db(particle_id: int) -> Optional[Particle]:
"""Get particle from Flair database by its id"""
try:
Particle.makeLists()
particle = Particle.get(particle_id)
return particle
except KeyError:
return None


class UsrbinScoring:
"""Scoring names for USRBIN estimator"""

_deposition_scorings = [
'ENERGY', 'EM-ENRGY', 'DOSE', 'UNB-ENER', 'UNB-EMEN', 'NIEL-DEP', 'DPA-SCO', 'DOSE-EM', 'DOSEQLET', 'RES-NIEL'
]
_fission_density_scorings = ['FISSIONS', 'HE-FISS', 'LE-FISS']
_neutron_balance_desnity_scorings = ['NEU-BALA']
_density_of_momentum_scorings = ['X-MOMENT', 'Y-MOMENT', 'Z-MOMENT']
_activity_scorings = ['ACTIVITY', 'ACTOMASS']
_dose_equivalent_scorings = ['DOSE-EQ']
_fluence_weighted_bdf_scorings = ['SI1MEVNE']
_he_tn_fluence_scorings = ['HEHAD-EQ', 'THNEU-EQ']
_net_charge_scorings = ['NET-CHRG']

@classmethod
def get_unit_for_scoring(cls, scoring: str) -> str:
"""Get unit for scoring name.
Based on:
- (1) https://flukafiles.web.cern.ch/manual/chapters/particle_and_material_codes/particles_codes.html
- (2) https://flukafiles.web.cern.ch/manual/chapters/description_input/description_options/usrbin.html
:param scoring: scoring name
:return: tuple of scoring and unit
"""
if scoring in cls._deposition_scorings:
if scoring == 'DPA-SCO':
return '/g'
if 'DOSE' in scoring:
return 'GeV/g '
return 'GeV'
if scoring in cls._fission_density_scorings:
return 'fissions/cm^3'
if scoring in cls._neutron_balance_desnity_scorings:
return 'neutrons/cm^3'
if scoring in cls._density_of_momentum_scorings:
return 'cm^-2'
if scoring in cls._activity_scorings:
# This is not totally true, see ACTIVITY and ACTOMASS from 1st link
if scoring == 'ACTIVITY':
return 'Bq/cm^3'
if scoring == 'ACTOMASS':
return 'Bq/g'
return ''
if scoring in cls._dose_equivalent_scorings:
return 'pSv'
if scoring in cls._fluence_weighted_bdf_scorings:
return 'GeV/cm^3'
if scoring in cls._he_tn_fluence_scorings:
return 'cm-2'
if scoring in cls._net_charge_scorings:
return 'C/cm^3'

# if unknown scoring, return empty string
return ''
45 changes: 43 additions & 2 deletions tests/mock/test_fluka_minimal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import numpy as np

import pytest
from pymchelper.axis import MeshAxis
from pymchelper.estimator import Estimator


Expand All @@ -33,7 +34,22 @@ def expected_results() -> dict:
[0., 0.00146468, 0.00169978, 0.],
[0.00090805, 0., 0., 0.],
[0, 0., 0., 0.]
]
],
"name": "ENERGY",
"axis": {
"x": {
"name": "Position (X)",
"unit": "cm",
},
"y": {
"name": "Position (Y)",
"unit": "cm",
},
"z": {
"name": "Position (Z)",
"unit": "cm",
},
}
},
"minimal001_fort.22": {
"shape": [4, 4, 1],
Expand All @@ -42,7 +58,22 @@ def expected_results() -> dict:
[0., 0.00188021, 0.00166488, 0.],
[0., 0.0010242, 0.00105254, 0.],
[0., 0., 0., 0.]
]
],
"name": "ENERGY",
"axis": {
"x": {
"name": "Position (X)",
"unit": "cm",
},
"y": {
"name": "Position (Y)",
"unit": "cm",
},
"z": {
"name": "Position (Z)",
"unit": "cm",
},
}
}
}

Expand All @@ -66,12 +97,22 @@ def test_fluka_mock(tmp_path: Path, output_file: str, expected_results: dict, fl
def __verify_fluka_file(actual_result: Estimator, expected_result: dict):
"""Compares content of generated fluka file with expected values"""
assert expected_result["shape"] == [actual_result.x.n, actual_result.y.n, actual_result.z.n]
assert len(actual_result.pages) == 1
assert expected_result['name'] in actual_result.pages[0].name
for axis_name, value in expected_result["axis"].items():
verify_axis(actual_result.__getattribute__(axis_name), value)

expected = list(np.around(np.array(expected_result["4x4"]).flatten(), 4))
result = list(np.around(np.array(actual_result.pages[0].data).flatten(), 4))
assert expected == result, "Fluka data does not match expected values"


def verify_axis(axis: MeshAxis, expected_axis: dict):
"""Compares axis of generated fluka file with expected values"""
assert axis.name == expected_axis["name"]
assert axis.unit == expected_axis["unit"]


def append_path_to_environ(path: Path) -> dict:
"""Append path to PATH environment variable"""
env = os.environ.copy()
Expand Down

0 comments on commit 3225b5e

Please sign in to comment.