diff --git a/lute/execution/executor.py b/lute/execution/executor.py index 4535aa5e..04dd5619 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -1151,7 +1151,7 @@ def _submit_cmd(self, executable_path: str, params: str) -> str: nprocs: int = max( int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1 ) - mpi_cmd: str = f"mpirun -np {nprocs}" + mpi_cmd: str = f"mpirun -np {nprocs} --map-by core" if __debug__: py_cmd = f"python -B -u -m mpi4py.run {executable_path} {params}" else: diff --git a/lute/execution/logging.py b/lute/execution/logging.py index 0edf0280..7890b0c7 100644 --- a/lute/execution/logging.py +++ b/lute/execution/logging.py @@ -94,10 +94,22 @@ def get_logger(name: str, is_task: bool = True) -> logging.Logger: other_logger, logging.PlaceHolder ): other_logger.disabled = True + elif "pyFAI" in other_name and not isinstance( + other_logger, logging.PlaceHolder + ): + other_logger.disabled = True + elif "goniometer" in other_name and not isinstance( + other_logger, logging.PlaceHolder + ): + other_logger.disabled = True elif "numpy" in other_name and not isinstance( other_logger, logging.PlaceHolder ): other_logger.setLevel(logging.CRITICAL) + elif "numexpr.utils" in other_name and not isinstance( + other_logger, logging.PlaceHolder + ): + other_logger.setLevel(logging.WARNING) logger: logging.Logger = logging.getLogger(name) logger.propagate = False handler: logging.Handler diff --git a/lute/io/models/__init__.py b/lute/io/models/__init__.py index dddc8343..f98322a4 100644 --- a/lute/io/models/__init__.py +++ b/lute/io/models/__init__.py @@ -11,3 +11,4 @@ from .tests import * from .mpi_tests import * from .geometry import * +from .geom_opt import * diff --git a/lute/io/models/geom_opt.py b/lute/io/models/geom_opt.py new file mode 100644 index 00000000..9323222b --- /dev/null +++ b/lute/io/models/geom_opt.py @@ -0,0 +1,160 @@ +"""Models for optimizing detector geometry using PyFAI and Bayesian optimization. + +Classes: + - OptimizePyFAIGeometryParameters(TaskParameters): + Parameters for optimizing detector geometry using PyFAI and Bayesian optimization. +""" + +__all__ = ["OptimizePyFAIGeometryParameters"] +__author__ = "Louis Conreux" + +from typing import Any, Dict, Optional, Union, Tuple + +from pydantic import BaseModel, Field, validator + +from lute.io.models.base import TaskParameters +from lute.io.models.validators import validate_smd_path, validate_calib_path + + +class OptimizePyFAIGeometryParameters(TaskParameters): + """Parameters for optimizing detector geometry using PyFAI and Bayesian optimization. + + The Bayesian Optimization has default hyperparameters that can be overriden by the user. + """ + + class Config(TaskParameters.Config): + set_result: bool = True + """Whether the Executor should mark a specified parameter as a result.""" + + class BayesGeomOptParameters(BaseModel): + """Bayesian optimization hyperparameters.""" + + bounds: Dict[str, Union[float, Tuple[float, float]]] = Field( + { + "dist": (0.02, 0.6), + "poni1": (-0.01, 0.01), + "poni2": (-0.01, 0.01), + }, + description="Bounds defining the parameter search space for the Bayesian optimization.", + ) + + res: float = Field( + None, + description="Resolution of the grid used to discretize the parameter search space.", + ) + + max_rings: int = Field( + 5, + description="Maximum number of rings to be used for the Bayesian optimization.", + ) + + n_samples: Optional[int] = Field( + 50, + description="Number of random starts to initialize the Bayesian optimization.", + ) + + n_iterations: Optional[int] = Field( + 50, + description="Number of iterations to run the Bayesian optimization.", + ) + + prior: Optional[bool] = Field( + True, + description="Whether to use a gaussian prior centered on the search space for the Bayesian optimization or randomly pick samples.", + ) + + af: Optional[str] = Field( + "ucb", + description="Acquisition function to be used by the Bayesian optimization.", + ) + + hyperparams: Optional[Dict[str, float]] = Field( + { + "beta": 1.96, + "epsilon": 0.01, + }, + description="Hyperparameters for the acquisition function.", + ) + + seed: Optional[int] = Field( + None, + description="Seed for the random number generator for potential reproducibility.", + ) + + _find_smd_path = validate_smd_path("powder") + + _find_in_file_path = validate_calib_path("in_file") + + exp: str = Field( + "", + description="Experiment name.", + ) + + run: Union[str, int] = Field( + None, + description="Run number.", + ) + + det_type: str = Field( + "", + description="Detector type. Currently supported: 'ePix10k2M', 'ePix10kaQuad', 'Rayonix', 'Jungfrau1M', 'Jungfrau4M'", + ) + + work_dir: str = Field( + "", + description="Main working directory for LUTE.", + ) + + in_file: str = Field( + "", + description="Path to the input .data file containing the detector geometry info to be calibrated.", + ) + + powder: str = Field( + "", + description="Powder diffraction pattern to be used for the calibration.", + ) + + calibrant: str = Field( + "", + description="Calibrant used for the calibration supported by pyFAI: https://github.com/silx-kit/pyFAI/tree/main/src/pyFAI/resources/calibration", + ) + + out_file: str = Field( + "", + description="Path to the output .data file containing the optimized detector geometry.", + is_result=True, + ) + + bo_params: BayesGeomOptParameters = Field( + BayesGeomOptParameters(), + description="Bayesian optimization parameters containing bounds and resolution for defining space search and hyperparameters.", + ) + + @validator("exp", always=True) + def validate_exp(cls, exp: str, values: Dict[str, Any]) -> str: + if not exp: + exp = values["lute_config"].experiment + return exp + + @validator("run", always=True) + def validate_run( + cls, run: Union[str, int], values: Dict[str, Any] + ) -> Union[str, int]: + if not run: + run = values["lute_config"].run + return run + + @validator("work_dir", always=True) + def validate_work_dir(cls, work_dir: str, values: Dict[str, Any]) -> str: + if not work_dir: + work_dir = values["lute_config"].work_dir + return work_dir + + @validator("out_file", always=True) + def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str: + if not out_file: + in_file = values["in_file"] + run = values["run"] + out_file = in_file.replace("0-end.data", f"{run}-end.data") + return out_file diff --git a/lute/io/models/validators.py b/lute/io/models/validators.py index 569afc58..f39305e4 100644 --- a/lute/io/models/validators.py +++ b/lute/io/models/validators.py @@ -5,8 +5,8 @@ parameters for validation. """ -__all__ = ["template_parameter_validator", "validate_smd_path"] -__author__ = "Gabriel Dorlhiac" +__all__ = ["template_parameter_validator", "validate_smd_path", "validate_calib_path"] +__author__ = ["Gabriel Dorlhiac", "Louis Conreux"] import os from typing import Dict, Any, Optional @@ -15,6 +15,9 @@ from lute.io.db import read_latest_db_entry +import psana # type: ignore +from PSCalib.CalibFileFinder import find_calib_file # type: ignore + def template_parameter_validator(template_params_name: str): """Populates a TaskParameters model with a set of validated TemplateParameters. @@ -68,3 +71,31 @@ def _validate_smd_path(cls, smd_path: str, values: Dict[str, Any]) -> str: return smd_path return validator(smd_path_name, always=True, allow_reuse=True)(_validate_smd_path) + + +def validate_calib_path(calib_path_name: str): + """Finds the path to a valid calibration file or raises an error.""" + + def _validate_calib_path(cls, calib_path: str, values: Dict[str, Any]) -> str: + if calib_path == "": + exp: str = values["lute_config"].experiment + run: int = int(values["lute_config"].run) + try: + det_type: str = values["det_type"] + except KeyError: + det_type = values["detname"] + cdir = f"/sdf/data/lcls/ds/{exp[:3]}/{exp}/calib" + ds_args = f"exp={exp}:run={run}:idx" + ds = psana.DataSource(ds_args) + det = psana.Detector(det_type, ds.env()) + src = str(det.name) + type = "geometry" + calib_path = find_calib_file(cdir, src, type, run, pbits=1) + if calib_path is None: + raise ValueError(f"No calibration file found for {det_type} in {cdir}") + + return calib_path + + return validator(calib_path_name, always=True, allow_reuse=True)( + _validate_calib_path + ) diff --git a/lute/managed_tasks.py b/lute/managed_tasks.py index 2b93397e..e44c5ec4 100644 --- a/lute/managed_tasks.py +++ b/lute/managed_tasks.py @@ -130,3 +130,8 @@ PeakFinderPsocake: Executor = Executor("FindPeaksPsocake") """Performs Bragg peak finding using psocake - *DEPRECATED*.""" + +# PyFAI +####### +PyFAIGeometryOptimizer: MPIExecutor = MPIExecutor("OptimizePyFAIGeometry") +"""Optimize detector geometry using PyFAI coupled with Bayesian Optimization.""" diff --git a/lute/tasks/__init__.py b/lute/tasks/__init__.py index 71ae8de2..6fd6b8ee 100644 --- a/lute/tasks/__init__.py +++ b/lute/tasks/__init__.py @@ -86,6 +86,11 @@ def import_task(task_name: str) -> Type[Task]: return TestMultiNodeCommunication + if task_name == "OptimizePyFAIGeometry": + from .geom_opt import OptimizePyFAIGeometry + + return OptimizePyFAIGeometry + if task_name == "OptimizeAgBhGeometryExhaustive": from .geometry import OptimizeAgBhGeometryExhaustive diff --git a/lute/tasks/geom_opt.py b/lute/tasks/geom_opt.py new file mode 100644 index 00000000..dce01fd7 --- /dev/null +++ b/lute/tasks/geom_opt.py @@ -0,0 +1,1075 @@ +""" +Classes for geometry optimization tasks. + +Classes: + OptimizePyFAIGeom: optimize detector geometry using PyFAI coupled with Bayesian Optimization + +""" + +__all__ = ["OptimizePyFAIGeometry"] +__author__ = "Louis Conreux" + +from lute.io.models.geom_opt import OptimizePyFAIGeometryParameters +from lute.tasks.task import Task +from lute.tasks.dataclasses import TaskStatus, ElogSummaryPlots +from lute.execution.logging import get_logger + +import psana # type: ignore +import os +import logging +from typing import Optional, Tuple +import sys + +sys.path.append("/sdf/home/l/lconreux/LCLSGeom") +from LCLSGeom.swap_geom import ( # type: ignore + PsanaToPyFAI, + PyFAIToCrystFEL, + CrystFELToPsana, + get_beam_center, +) + +import h5py # type: ignore +import panel as pn # type: ignore +import numpy as np +import numpy.typing as npt +import matplotlib.pyplot as plt # type: ignore +import pyFAI # type: ignore +from pyFAI.geometry import Geometry # type: ignore +from pyFAI.goniometer import SingleGeometry # type: ignore +from pyFAI.azimuthalIntegrator import AzimuthalIntegrator # type: ignore +from pyFAI.calibrant import CALIBRANT_FACTORY # type: ignore +from sklearn.gaussian_process import GaussianProcessRegressor # type: ignore +from sklearn.gaussian_process.kernels import RBF, ConstantKernel, WhiteKernel # type: ignore +from sklearn.utils._testing import ignore_warnings # type: ignore +from sklearn.exceptions import ConvergenceWarning # type: ignore +from scipy.stats import norm # type: ignore +from mpi4py import MPI + +pyFAI.use_opencl = False # type: ignore + +logger: logging.Logger = get_logger(__name__) + + +class BayesGeomOpt: + """ + Class to perform Geometry Optimization using Bayesian Optimization on pyFAI + + Parameters + ---------- + exp : str + Experiment name + run : int + Run number + det_type : str + Detector type + detector : PyFAI(Detector) + PyFAI detector object + calibrant : str + Calibrant name + """ + + def __init__( + self, + exp, + run, + det_type, + detector, + calibrant, + ): + self.exp = exp + self.run = run + self.det_type = det_type + self.comm = MPI.COMM_WORLD + self.rank = self.comm.Get_rank() + self.size = self.comm.Get_size() + self.detector = detector + self.calibrant = calibrant + self.order = ["dist", "poni1", "poni2", "rot1", "rot2", "rot3"] + self.space = ["poni1", "poni2"] + self.values = { + "dist": 0.1, + "poni1": 0, + "poni2": 0, + "rot1": 0, + "rot2": 0, + "rot3": 0, + } + + @staticmethod + def expected_improvement(X, gp_model, best_y, epsilon=0): + y_pred, y_std = gp_model.predict(X, return_std=True) + z = (y_pred - best_y + epsilon) / y_std + ei = y_pred - best_y * norm.cdf(z) + y_std * norm.pdf(z) + return ei + + @staticmethod + def upper_confidence_bound(X, gp_model, best_y=None, beta=1.96): + y_pred, y_std = gp_model.predict(X, return_std=True) + ucb = y_pred + beta * y_std + return ucb + + @staticmethod + def probability_of_improvement(X, gp_model, best_y, epsilon=0): + y_pred, y_std = gp_model.predict(X, return_std=True) + z = (y_pred - best_y + epsilon) / y_std + pi = norm.cdf(z) + return pi + + @staticmethod + def contextual_improvement(X, gp_model, best_y, hyperparam=None): + y_pred, y_std = gp_model.predict(X, return_std=True) + cv = np.mean(y_std**2) / best_y + z = (y_pred - best_y + cv) / y_std + ci = y_pred - best_y * norm.cdf(z) + y_std * norm.pdf(z) + return ci + + def set_wavelength_calibrant(self): + """ + Define calibrant for optimization with appropriate wavelength + + Parameters + ---------- + wavelength : float + Wavelength of the experiment + """ + self.calibrant_name = self.calibrant + calibrant = CALIBRANT_FACTORY(self.calibrant) + ds_args = f"exp={self.exp}:run={self.run}:idx" + ds = psana.DataSource(ds_args) + runner = next(ds.runs()) + evt = runner.event(runner.times()[0]) + try: + photon_energy = psana.Detector("EBeam").get(evt).ebeamPhotonEnergy() + except AttributeError: + logger.warning("Event lacking an ebeamPhotonEnergy value.") + if photon_energy is None or np.isinf(photon_energy): + self.wavelength = ds.env().epicsStore().value("SIOC:SYS0:ML00:AO192") * 1e-9 + else: + self.wavelength = 1.23984197386209e-06 / photon_energy + self.photon_energy = photon_energy + calibrant.wavelength = self.wavelength + self.calibrant = calibrant + + def build_mask(self, central=True, edges=True): + """ + Mask pixels marked as false status, edges and central pixels of panels + + Parameters + ---------- + central : bool + Mask central pixel of panels + edges : bool + Mask edges of panels + """ + ds_args = f"exp={self.exp}:run={self.run}:idx" + ds = psana.DataSource(ds_args) + det = psana.Detector(self.det_type, ds.env()) + runner = next(ds.runs()) + evt = runner.event(runner.times()[0]) + runnum = evt.run() + try: + mask = det.mask_v2(par=runnum, central=central, edges=edges) + except AttributeError: + mask = None + if mask is not None: + if len(mask.shape) != 2: + mask = np.reshape(mask, (mask.shape[0] * mask.shape[1], mask.shape[2])) + return mask + + def min_intensity(self, powder): + """ + Define minimal intensity for control point extraction + + The minimal intensity is chosen so that the Signal to Noise Ratio (SNR) is maximized + Signal is defined as the standard deviation of the pixels above the threshold + Noise is defined as the standard deviation of the pixels below the threshold + + Parameters + ---------- + powder : np.ndarray + Powder image + """ + mean = np.mean(powder) + threshold = mean + 5 * np.std(powder) + if self.rank == 0: + logger.info(f"Threshold for pixel outliers: {threshold:.2e}") + nice_pix = powder < threshold + SNRs = [] + Imins = np.arange(95, 100, 0.1) + for Imin in Imins: + threshold = np.percentile(powder[nice_pix], Imin) + signal_pixels = powder[nice_pix][powder[nice_pix] > threshold] + signal = np.std(signal_pixels) + noise_pixels = powder[nice_pix][powder[nice_pix] <= threshold] + noise = np.std(noise_pixels) + SNRs.append(signal / noise) + q = Imins[np.argmax(SNRs)] + Imin = np.percentile(powder[nice_pix], q) + self.q = round(q, 1) + self.Imin = Imin + self.powder = powder + return Imin + + @ignore_warnings(category=ConvergenceWarning) + def bayes_opt_center( + self, + powder, + dist, + bounds, + res, + Imin=90, + max_rings=5, + n_samples=50, + n_iterations=50, + af="ucb", + hyperparam=None, + prior=True, + seed=None, + ): + """ + Perform Bayesian Optimization on PONI center parameters, for a fixed distance + + Parameters + ---------- + powder : np.ndarray + Powder image + dist : float + Fixed distance + bounds : dict + Dictionary of bounds for each parameter + res : float + Resolution of the grid used to discretize the parameter search space + Imin : float + Minimum intensity threshold for control point extraction based on intensity distribution percentile + max_rings : int + Maximum number of rings to use for control point extraction + n_samples : int + Number of samples to initialize the GP model + n_iterations : int + Number of iterations for optimization + af : str + Acquisition function to use for optimization + hyperparam : dict + Dictionary of hyperparameters for the acquisition function + prior : bool + Use prior information for optimization + seed : int + Random seed for reproducibility + """ + + if seed is not None: + np.random.seed(seed) + self.values["dist"] = dist + if res is None: + res = self.detector.pixel_size + + inputs = {} + norm_inputs = {} + for p in self.order: + if p in self.space: + inputs[p] = np.arange(bounds[p][0], bounds[p][1] + res, res) + norm_inputs[p] = inputs[p] + else: + inputs[p] = np.array([self.values[p]]) + X = np.array(np.meshgrid(*[inputs[p] for p in self.order])).T.reshape( + -1, len(self.order) + ) + X_space = np.array( + np.meshgrid(*[norm_inputs[p] for p in self.space]) + ).T.reshape(-1, len(self.space)) + X_norm = (X_space - np.mean(X_space, axis=0)) / ( + np.max(X_space, axis=0) - np.min(X_space, axis=0) + ) + if prior: + means = np.mean(X_space, axis=0) + cov = np.diag( + [ + ((bounds[param][1] - bounds[param][0]) / 5) ** 2 + for param in self.space + ] + ) + X_samples = np.random.multivariate_normal(means, cov, n_samples) + X_norm_samples = (X_samples - np.mean(X_space, axis=0)) / ( + np.max(X_space, axis=0) - np.min(X_space, axis=0) + ) + for p in self.order: + if p not in self.space: + idx = self.order.index(p) + X_samples = np.insert(X_samples, idx, self.values[p], axis=1) + else: + idx_samples = np.random.choice(X.shape[0], n_samples) + X_samples = X[idx_samples] + X_norm_samples = X_norm[idx_samples] + + bo_history = {} + y = np.zeros((n_samples)) + for i in range(n_samples): + dist, poni1, poni2, rot1, rot2, rot3 = X_samples[i] + geom_initial = Geometry( + dist=dist, + poni1=poni1, + poni2=poni2, + rot1=rot1, + rot2=rot2, + rot3=rot3, + detector=self.detector, + wavelength=self.calibrant.wavelength, + ) + sg = SingleGeometry( + "extract_cp", + powder, + calibrant=self.calibrant, + detector=self.detector, + geometry=geom_initial, + ) + sg.extract_cp(max_rings=max_rings, pts_per_deg=1, Imin=Imin) + y[i] = len(sg.geometry_refinement.data) + bo_history[f"init_sample_{i+1}"] = {"param": X_samples[i], "score": y[i]} + + if np.all(y == 0): + result = { + "bo_history": bo_history, + "params": [dist, 0, 0, 0, 0, 0], + "residual": 0, + "score": 0, + "best_idx": 0, + } + logger.warning( + f"All samples have score 0 for dist={dist}. Skipping Bayesian Optimization." + ) + return result + y_norm = (y - np.mean(y)) / np.std(y) + best_score = np.max(y_norm) + + kernel = RBF(length_scale=0.3, length_scale_bounds=(0.2, 0.4)) * ConstantKernel( + constant_value=1.0, constant_value_bounds=(0.5, 1.5) + ) + WhiteKernel(noise_level=0.001, noise_level_bounds="fixed") + gp_model = GaussianProcessRegressor( + kernel=kernel, n_restarts_optimizer=10, random_state=42 + ) + gp_model.fit(X_norm_samples, y_norm) + visited_idx = list([]) + + if af == "ucb": + if hyperparam is None: + hyperparam = {"beta": 1.96} + hyperparam = hyperparam["beta"] + af = self.upper_confidence_bound + elif af == "ei": + if hyperparam is None: + hyperparam = {"epsilon": 0} + hyperparam = hyperparam["epsilon"] + af = self.expected_improvement + elif af == "pi": + if hyperparam is None: + hyperparam = {"epsilon": 0} + hyperparam = hyperparam["epsilon"] + af = self.probability_of_improvement + elif af == "ci": + af = self.contextual_improvement + + for i in range(n_iterations): + # 1. Generate the Acquisition Function values using the Gaussian Process Regressor + af_values = af(X_norm, gp_model, best_score, hyperparam) + af_values[visited_idx] = -np.inf + + # 2. Select the next set of parameters based on the Acquisition Function + new_idx = np.argmax(af_values) + new_input = X[new_idx] + visited_idx.append(new_idx) + + # 3. Compute the score of the new set of parameters + dist, poni1, poni2, rot1, rot2, rot3 = new_input + geom_initial = Geometry( + dist=dist, + poni1=poni1, + poni2=poni2, + rot1=rot1, + rot2=rot2, + rot3=rot3, + detector=self.detector, + wavelength=self.calibrant.wavelength, + ) + sg = SingleGeometry( + "extract_cp", + powder, + calibrant=self.calibrant, + detector=self.detector, + geometry=geom_initial, + ) + sg.extract_cp(max_rings=max_rings, pts_per_deg=1, Imin=Imin) + score = len(sg.geometry_refinement.data) + y = np.append(y, [score], axis=0) + bo_history[f"iteration_{i+1}"] = { + "param": X[new_idx], + "score": score, + } + X_samples = np.append(X_samples, [X[new_idx]], axis=0) + X_norm_samples = np.append(X_norm_samples, [X_norm[new_idx]], axis=0) + y_norm = (y - np.mean(y)) / np.std(y) + best_score = np.max(y_norm) + # 4. Update the Gaussian Process Regressor + gp_model.fit(X_norm_samples, y_norm) + + best_idx = np.argmax(y_norm) + best_param = X_samples[best_idx] + dist, poni1, poni2, rot1, rot2, rot3 = best_param + geom_initial = Geometry( + dist=dist, + poni1=poni1, + poni2=poni2, + rot1=rot1, + rot2=rot2, + rot3=rot3, + detector=self.detector, + wavelength=self.calibrant.wavelength, + ) + sg = SingleGeometry( + "extract_cp", + powder, + calibrant=self.calibrant, + detector=self.detector, + geometry=geom_initial, + ) + sg.extract_cp(max_rings=max_rings, pts_per_deg=1, Imin=Imin) + self.sg = sg + score = len(sg.geometry_refinement.data) + residual = 0 + if score != 0: + residual = sg.geometry_refinement.refine3(fix=["wavelength"]) + params = sg.geometry_refinement.param + result = { + "bo_history": bo_history, + "params": params, + "residual": residual, + "score": score, + "best_idx": best_idx, + } + return result + + def bayes_opt_geom( + self, + powder, + bounds, + res, + Imin=99, + max_rings=5, + n_samples=50, + n_iterations=50, + af="ucb", + hyperparam=None, + prior=True, + seed=None, + ): + """ + From guessed initial geometry, optimize the geometry using Bayesian Optimization on pyFAI package + + Parameters + ---------- + powder : str + Path to powder image to use for calibration + bounds : dict + Dictionary of bounds and resolution for search parameters + res : float + Resolution of the grid used to discretize the parameter search space + Imin : float + Minimum intensity threshold for control point extraction based on intensity distribution percentile + max_rings : int + Maximum number of rings to use for control point extraction + n_samples : int + Number of samples to initialize the GP model + n_iterations : int + Number of iterations for optimization + af : str + Acquisition function to use for optimization + hyperparam : dict + Dictionary of hyperparameters for the acquisition function + prior : bool + Use prior information for optimization + seed : int + Random seed for reproducibility + """ + + if seed is not None: + np.random.seed(seed) + + self.set_wavelength_calibrant() + + mask = self.build_mask() + if mask is not None: + powder = powder * mask + + self.max_rings = max_rings + Imin = self.min_intensity(powder) + + if self.rank == 0: + logger.info( + f"Optimizing geometry for exp {self.exp} run {self.run} with {self.det_type} detector with minimal intensity threshold {Imin:.2e}" + ) + logger.info(f"Number of distances to scan: {self.size}") + if isinstance(bounds["dist"], float): + distances = np.linspace( + bounds["dist"] - 0.05, bounds["dist"] + 0.05, self.size + ) + else: + distances = np.linspace(bounds["dist"][0], bounds["dist"][1], self.size) + self.distances = distances + else: + distances = None + + dist = self.comm.scatter(distances, root=0) + + results = self.bayes_opt_center( + powder, + dist, + bounds, + res, + Imin, + max_rings, + n_samples, + n_iterations, + af, + hyperparam, + prior, + seed, + ) + self.comm.Barrier() + + self.scan = {} + self.scan["bo_history"] = self.comm.gather(results["bo_history"], root=0) + self.scan["params"] = self.comm.gather(results["params"], root=0) + self.scan["residual"] = self.comm.gather(results["residual"], root=0) + self.scan["score"] = self.comm.gather(results["score"], root=0) + self.scan["best_idx"] = self.comm.gather(results["best_idx"], root=0) + self.finalize() + + def finalize(self): + if self.rank == 0: + for key in self.scan.keys(): + self.scan[key] = np.array([item for item in self.scan[key]]) + index = np.argmin(self.scan["residual"]) + self.index = index + self.bo_history = self.scan["bo_history"][index] + self.params = self.scan["params"][index] + self.residual = self.scan["residual"][index] + self.score = self.scan["score"][index] + self.best_idx = self.scan["best_idx"][index] + + def display(self, powder=None, cp=None, ai=None, label=None, sg=None, ax=None): + """ + Display an image with the control points and the calibrated rings + + Parameters + ---------- + powder : np.ndarray + """ + if ax is None: + _fig, ax = plt.subplots() + if sg is not None: + if powder is None: + powder = sg.image + if cp is None: + cp = sg.control_points + if ai is None: + ai = sg.geometry_refinement + if label is None: + label = sg.label + img = ax.imshow( + powder.T, + origin="lower", + cmap="viridis", + vmin=np.percentile(powder, 5), + vmax=np.percentile(powder, 95), + ) + cbar = plt.colorbar(img, ax=ax, orientation="vertical") + cbar.set_label("Intensity") + if ai is not None and cp.calibrant is not None: + tth = cp.calibrant.get_2th() + ttha = ai.twoThetaArray() + ax.contour( + ttha.T, levels=tth, cmap="autumn", linewidths=0.5, linestyles="dashed" + ) + return ax + + def radial_integration(self, result, calibrant=None, label=None, ax=None): + """ + Display the powder diffraction pattern + + Parameters + ---------- + result : np.ndarray + Powder diffraction pattern + calibrant : Calibrant + Calibrant object + label : str + Name of the curve + ax : plt.Axes + Matplotlib axes + """ + from matplotlib import lines + + if ax is None: + fig, ax = plt.subplots() + + try: + unit = result.unit + except AttributeError: + unit = None + if len(result) == 3: + ax.errorbar(*result, label=label) + else: + ax.plot(*result, label=label) + + if label: + ax.legend() + if calibrant and unit: + x_values = calibrant.get_peaks(unit) + if x_values is not None: + for x in x_values: + line = lines.Line2D( + [x, x], + ax.axis()[2:4], + color="red", + linestyle="--", + linewidth=0.5, + ) + ax.add_line(line) + + ax.set_title("Radial Profile") + if unit: + ax.set_xlabel(unit.label) + ax.set_ylabel("Intensity") + + def score_distance_scan(self, distances, ax): + """ + Plot the score scan over distance + + Parameters + ---------- + distances : np.array + Array of distances + ax : plt.Axes + Matplotlib axes + """ + scores = self.scan["score"] + ax.plot(distances, scores) + ax.set_xlabel("Distance (m)") + ax.set_ylabel("Score") + ax.set_title("Number of Control Points vs Distance") + + def residual_distance_scan(self, distances, refined_dist, ax): + """ + Plot the residual scan over distance + + Parameters + ---------- + distances : np.array + Array of distances + refined_dist : float + Refined distance + ax : plt.Axes + Matplotlib axes + """ + residuals = self.scan["residual"] + ax.plot(distances, residuals) + best_dist = distances[self.index] + ax.axvline( + best_dist, + color="green", + linestyle="--", + label=f"Best distance (m): {best_dist:.2e}", + ) + ax.axvline( + refined_dist, + color="red", + linestyle="--", + label=f"Refined distance (m): {refined_dist:.2e}", + ) + ax.legend(fontsize="x-small") + ax.set_yscale("log") + ax.set_xlabel("Distance (m)") + ax.set_ylabel("Residual") + ax.set_title("Residual vs Distance") + + def hist_and_compute_stats(self, powder, exp, run, ax): + """ + Plot histogram of pixel intensities and compute statistics + + Parameters + ---------- + powder : np.ndarray + Powder image + exp : str + Experiment name + run : int + Run number + ax : plt.Axes + Matplotlib axes + """ + threshold = np.mean(powder) + 5 * np.std(powder) + nice_pix = powder < threshold + mean = np.mean(powder[nice_pix]) + std_dev = np.std(powder[nice_pix]) + nice_pix = powder < threshold + _ = ax.hist( + powder[nice_pix], + bins=1000, + color="skyblue", + edgecolor="black", + alpha=0.7, + label="Pixel Intensities", + ) + ax.axvline( + mean, + color="red", + linestyle="--", + label=f"Mean ({mean:.2f})", + ) + ax.axvline( + mean + std_dev, + color="orange", + linestyle="--", + label=f"Mean + Std Dev ({mean + std_dev:.2f})", + ) + ax.axvline( + mean + 2 * std_dev, + color="green", + linestyle="--", + label=f"Mean + 2 Std Dev ({mean + 2 * std_dev:.2f})", + ) + ax.axvline( + self.Imin, + color="purple", + linestyle=":", + linewidth=1.5, + label=f"{self.q} th Percentile ({self.Imin:.2f})", + ) + ax.set_xlim([0, mean + 5 * std_dev]) + ax.set_xlabel("Pixel Intensity") + ax.set_ylabel("Frequency") + ax.set_title(f"Histogram of Pixel Intensities \n for {exp} run {run}") + ax.legend(fontsize="x-small") + + def visualize_results( + self, powder, bo_history, detector, params, refined_dist, plot="" + ): + """ + Visualize fit, plotting (1) the BO convergence, (2) the radial profile and (3) the powder image. + + Parameters + ---------- + powder : np.ndarray + Powder image + bo_history : dict + Dictionary containing the history of optimization + detector : PyFAI(Detector) + Corrected PyFAI detector object + params : list + List of parameters for the best fit + refined_dist : float + Refined distance + plot : str + Path to save plot + """ + fig = plt.figure(figsize=(12, 16), dpi=180) + nrow, ncol = 3, 2 + irow, icol = 0, 0 + + # Plotting BO convergence + ax1 = plt.subplot2grid((nrow, ncol), (irow, icol)) + scores = [bo_history[key]["score"] for key in bo_history.keys()] + ax1.plot(np.maximum.accumulate(scores)) + ax1.set_xticks(np.arange(len(scores), step=20)) + ax1.set_xlabel("Iteration") + ax1.set_ylabel("Best score so far") + ax1.set_title(f"Convergence Plot, best score: {self.scan['score'][self.index]}") + icol += 1 + + # Plotting histogram of pixel intensities + ax2 = plt.subplot2grid((nrow, ncol), (irow, icol), colspan=ncol - icol) + self.hist_and_compute_stats(powder, self.exp, self.run, ax2) + irow += 1 + icol = 0 + + # Plotting radial profiles with peaks + ax3 = plt.subplot2grid((nrow, ncol), (irow, icol)) + ai = AzimuthalIntegrator( + dist=params[0], detector=detector, wavelength=self.calibrant.wavelength + ) + masked_powder = powder + if self.det_type.lower() == "rayonix": + radius = np.sqrt(2) * powder.shape[0] / 4 + row, col = np.ogrid[: powder.shape[0], : powder.shape[1]] + center = (powder.shape[0] / 2, powder.shape[1] / 2) + mask = ((row - center[0]) ** 2 + (col - center[1]) ** 2) <= radius**2 + masked_powder = powder * mask + res = ai.integrate1d(masked_powder, 1000) + self.radial_integration(res, calibrant=self.calibrant, ax=ax3) + icol += 1 + + # Plotting stacked powder + ax4 = plt.subplot2grid((nrow, ncol), (irow, icol), colspan=ncol - icol) + geometry = Geometry(dist=params[0]) + sg = SingleGeometry( + f"Max {self.calibrant_name}", + powder, + calibrant=self.calibrant, + detector=detector, + geometry=geometry, + ) + sg.extract_cp(max_rings=self.max_rings, pts_per_deg=1, Imin=self.Imin) + self.display(sg=sg, ax=ax4) + irow += 1 + icol = 0 + + # Plotting score scan over distance + ax5 = plt.subplot2grid((nrow, ncol), (irow, icol)) + self.score_distance_scan(self.distances, ax5) + icol += 1 + + # Plotting residual scan over distance + ax6 = plt.subplot2grid((nrow, ncol), (irow, icol), colspan=ncol - icol) + self.residual_distance_scan(self.distances, refined_dist, ax6) + + if plot != "": + fig.savefig(plot, dpi=180) + return fig + + +class OptimizePyFAIGeometry(Task): + """Optimize detector geometry using PyFAI coupled with Bayesian Optimization.""" + + def __init__( + self, *, params: OptimizePyFAIGeometryParameters, use_mpi: bool = True + ) -> None: + super().__init__(params=params, use_mpi=use_mpi) + + def _build_pyFAI_detector(self): + """ + Fetch the geometry data and build a pyFAI detector object. + """ + assert isinstance(self._task_parameters, OptimizePyFAIGeometryParameters) + in_file = self._task_parameters.in_file + det_type = self._task_parameters.det_type + ds_args = f"exp={self._task_parameters.exp}:run={self._task_parameters.run}:idx" + self.ds = psana.DataSource(ds_args) + self.det = psana.Detector(det_type, self.ds.env()) + self.shape = self.det.shape() + if det_type.lower() == "rayonix": + env = self.ds.env() + cfg = env.configStore() + self.pixel_size = cfg.get(psana.Rayonix.ConfigV2).pixelWidth() * 1e-6 + else: + self.pixel_size = self.det.pixel_size(self.ds.env()) * 1e-6 + psana_to_pyfai = PsanaToPyFAI( + in_file=in_file, + det_type=det_type, + pixel_size=self.pixel_size, + shape=self.shape, + ) + detector = psana_to_pyfai.detector + return detector + + def _check_if_path_and_type(self, string: str) -> Tuple[bool, Optional[str]]: + """ + Check if a string is a valid path and determine the filetype. + + Parameters + ---------- + string : str + String that may be a file path. + + Returns + ------- + is_valid_path : bool + If it is a valid path. + + powder_type : Optional[str] + If is_valid_path, the file type. + """ + is_valid_path: bool = False + powder_type: Optional[str] = None + if os.path.exists(string): + is_valid_path = True + else: + return is_valid_path, powder_type + try: + with h5py.File(string): + powder_type = "smd" + is_valid_path = True + + return is_valid_path, powder_type + except Exception: + ... + + try: + np.load(string) + powder_type = "numpy" + is_valid_path = True + return is_valid_path, powder_type + except ValueError: + ... + + return is_valid_path, powder_type + + def _extract_powder( + self, powder_path: str, shape: Tuple + ) -> Optional[npt.NDArray[np.float64]]: + """ + Extract a powder image from either a smalldata file or numpy array. + + Parameters + ---------- + powder_path : str + Path to the object containing the powder image. + shape : Tuple + Stacked shape of the detector. Powder image has to be reshaped to match detector shape. + + Returns + ------- + powder : Optional[npt.NDArray[np.float64]] + The extracted powder image. + Returns None if no powder could be extracted and no specific error was encountered. + """ + powder: Optional[npt.NDArray[np.float64]] = None + assert isinstance(self._task_parameters, OptimizePyFAIGeometryParameters) + if isinstance(powder_path, str): + is_valid: bool + dtype: Optional[str] + is_valid, dtype = self._check_if_path_and_type(powder_path) + if is_valid and dtype == "numpy": + powder = np.load(powder_path) + if powder is not None and powder.shape != shape: + powder = np.reshape(powder, shape) + elif is_valid and dtype == "smd": + h5: h5py.File + with h5py.File(powder_path) as h5: + try: + if self._task_parameters.det_type == "Rayonix": + powder = h5[ + f"Sums/{self._task_parameters.det_type}_calib_skipFirst_max" + ][()] + else: + powder = h5[ + f"Sums/{self._task_parameters.det_type}_calib_max" + ][()] + except KeyError: + logger.warning( + 'No "Max" powder found in SmallData. Using "Sum" powder.' + ) + powder = h5[f"Sums/{self._task_parameters.det_type}_calib"][()] + if powder is not None and powder.shape != shape: + powder = np.reshape(powder, shape) + return powder + + def _update_geometry(self, optimizer): + """ + Update the geometry and write a new .geom file and .data file + + Parameters + ---------- + optimizer : BayesGeomOpt + Optimizer object + """ + assert isinstance(self._task_parameters, OptimizePyFAIGeometryParameters) + PyFAIToCrystFEL( + detector=optimizer.detector, + params=optimizer.params, + psana_file=self._task_parameters.in_file, + out_file=self._task_parameters.out_file.replace( + f"{self._task_parameters.run}-end.data", + f"r{self._task_parameters.run:0>4}.geom", + ), + ) + CrystFELToPsana( + in_file=self._task_parameters.out_file.replace( + f"{self._task_parameters.run}-end.data", + f"r{self._task_parameters.run:0>4}.geom", + ), + det_type=optimizer.det_type, + out_file=self._task_parameters.out_file, + pixel_size=self.pixel_size, + shape=self.shape, + ) + psana_to_pyfai = PsanaToPyFAI( + in_file=self._task_parameters.out_file, + det_type=self._task_parameters.det_type, + pixel_size=self.pixel_size, + shape=self.shape, + ) + detector = psana_to_pyfai.detector + return detector + + def _run(self) -> None: + assert isinstance(self._task_parameters, OptimizePyFAIGeometryParameters) + detector = self._build_pyFAI_detector() + powder = self._extract_powder(self._task_parameters.powder, detector.shape) + if powder is None: + raise RuntimeError("Unable to extract powder. Cannot continue.") + optimizer = BayesGeomOpt( + exp=self._task_parameters.exp, + run=self._task_parameters.run, + det_type=self._task_parameters.det_type, + detector=detector, + calibrant=self._task_parameters.calibrant, + ) + optimizer.bayes_opt_geom( + powder=powder, + bounds=self._task_parameters.bo_params.bounds, + res=self._task_parameters.bo_params.res, + max_rings=self._task_parameters.bo_params.max_rings, + n_samples=self._task_parameters.bo_params.n_samples, + n_iterations=self._task_parameters.bo_params.n_iterations, + af=self._task_parameters.bo_params.af, + hyperparam=self._task_parameters.bo_params.hyperparams, + prior=self._task_parameters.bo_params.prior, + seed=self._task_parameters.bo_params.seed, + ) + if optimizer.rank == 0: + logger.info("Optimization complete") + distance, cx, cy = get_beam_center(optimizer.params) + logger.info(f"Detector Distance to Sample: {distance:.2e}") + logger.info(f"Beam center: ({cx:.2e}, {cy:.2e})") + logger.info( + f"Rotations: \u03B8x = ({optimizer.params[3]:.2e}, \u03B8y = {optimizer.params[4]:.2e}, \u03B8z = {optimizer.params[5]:.2e})" + ) + logger.info(f"Final Residual: {optimizer.residual:.2e}") + fig_folder = os.path.join(self._task_parameters.work_dir, "figs") + os.makedirs(fig_folder, exist_ok=True) + plot = ( + f"{fig_folder}/bayes_opt_geom_{optimizer.exp}_r{optimizer.run:0>4}.png" + ) + calib_detector = self._update_geometry(optimizer) + fig = optimizer.visualize_results( + powder=optimizer.powder, + bo_history=optimizer.bo_history, + detector=calib_detector, + params=optimizer.params, + refined_dist=distance, + plot=plot, + ) + plots = pn.Tabs(fig) + p1, p2, _ = calib_detector.calc_cartesian_positions() + cx_pix = np.abs(cx - np.min(p1)) / calib_detector.pixel1 + cy_pix = np.abs(cy - np.min(p2)) / calib_detector.pixel2 + theta: float = np.arctan(cx / distance) + q: float = ( + 2.0 * np.sin(theta / 2.0) / (optimizer.calibrant.wavelength * 1e10) + ) + edge_resolution: float = 1.0 / q + self._result.summary = [] + self._result.summary.append( + { + "Detector distance (m)": f"{distance:.3f}", + "Detector center (m)": (f"{cx:.6f}", f"{cy:.6f}"), + "Detector edge resolution (A)": f"{edge_resolution:.3f}", + } + ) + logger.info(f"Beam center (pixels): ({cx_pix}, {cy_pix})") + logger.info(f"Detector edge resolution (A): {edge_resolution}") + self._result.summary.append( + ElogSummaryPlots( + f"Geometry_Fit/r{self._task_parameters.run:0>4}", plots + ) + ) + self._result.task_status = TaskStatus.COMPLETED diff --git a/workflows/airflow/geom_opt_pyfai.py b/workflows/airflow/geom_opt_pyfai.py new file mode 100644 index 00000000..8a3ad98e --- /dev/null +++ b/workflows/airflow/geom_opt_pyfai.py @@ -0,0 +1,41 @@ +"""Run geometry optimization for centering detector on the beam. + +Performs a Bayesian Optimization coupled with pyFAI least squares fitting of distance, beam center and tilt angles. + +Note: + The task_id MUST match the managed task name when defining DAGs - it is used + by the operator to properly launch it. + + dag_id names must be unique, and they are not namespaced via folder + hierarchy. I.e. all DAGs on an Airflow instance must have unique ids. The + Airflow instance used by LUTE is currently shared by other software - DAG + IDs should always be prefixed with `lute_`. LUTE scripts should append this + internally, so a DAG "lute_test" can be triggered by asking for "test" +""" + +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import JIDSlurmOperator + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = ( + "Run geometry optimization based on given calibrant. Produce powder using " + "smalldata_tools." +) + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(2024, 9, 3), + schedule_interval=None, + description=description, + is_paused_upon_creation=False, +) + +smd_producer: JIDSlurmOperator = JIDSlurmOperator(task_id="SmallDataProducer", dag=dag) + +geom_optimizer: JIDSlurmOperator = JIDSlurmOperator(max_cores=120, task_id="PyFAIGeometryOptimizer", dag=dag) + + +# Powder production and geometry optimization +smd_producer >> geom_optimizer