diff --git a/config/templates/sz.json b/config/templates/sz.json new file mode 100644 index 00000000..59301b09 --- /dev/null +++ b/config/templates/sz.json @@ -0,0 +1,55 @@ +{ + "compressor_id": "pressio", + "early_config": { + "pressio": { + "pressio:compressor": "roibin", + "roibin": { + "roibin:metric": "composite", + "roibin:background": "mask_binning", + "roibin:roi": "fpzip", + "background": { + "binning:compressor": "pressio", + "mask_binning:compressor": "pressio", + "pressio": { + "pressio:compressor": {{ compressor | tojson }} + } + }, + "composite": { + "composite:plugins": [ + "size", + "time", + "input_stats", + "error_stat" + ] + } + } + } + }, + "compressor_config": { + "pressio": { + "roibin": { + "roibin:roi_size": [ + {{ roiWindowSize}}, + {{ roiWindowSize}}, + 0 + ], + "roibin:centers": null, + "roibin:nthreads": 4, + "roi": { + "fpzip:prec": 0 + }, + "background": { + "mask_binning:mask": null, + "mask_binning:shape": [ + {{ binSize }}, + {{ binSize }}, + 1 + ], + "mask_binning:nthreads": 4, + "pressio": {{ pressio_opts | tojson }} + } + } + } + }, + "name": "pressio" +} diff --git a/lute/execution/ipc.py b/lute/execution/ipc.py index cafe0ab1..e81bd5ec 100644 --- a/lute/execution/ipc.py +++ b/lute/execution/ipc.py @@ -22,20 +22,21 @@ ] __author__ = "Gabriel Dorlhiac" -import _io +import logging import os -import sys -import socket import pickle -import subprocess import select -import logging +import socket +import subprocess +import sys import warnings -from typing import Optional, Any, Set -from typing_extensions import Self -from dataclasses import dataclass from abc import ABC, abstractmethod +from dataclasses import dataclass from enum import Enum +from typing import Any, Optional, Set + +import _io +from typing_extensions import Self LUTE_SIGNALS: Set[str] = { "NO_PICKLE_MODE", @@ -182,7 +183,7 @@ def read(self, proc: subprocess.Popen) -> Message: contents = raw_contents.decode() except UnicodeDecodeError as err: logger.debug("PipeCommunicator (Executor) - Set _use_pickle=True") - self._use_pickle = True + # self._use_pickle = True contents = self._safe_unpickle_decode(raw_contents) else: contents = None diff --git a/lute/io/models/sfx_find_peaks.py b/lute/io/models/sfx_find_peaks.py index a1b42822..40b86814 100644 --- a/lute/io/models/sfx_find_peaks.py +++ b/lute/io/models/sfx_find_peaks.py @@ -1,9 +1,10 @@ +import os from pathlib import Path from typing import Any, Dict, Literal, Optional, Union -from pydantic import BaseModel, Field, validator +from pydantic import BaseModel, Field, PositiveInt, validator -from .base import BaseBinaryParameters, TaskParameters +from .base import BaseBinaryParameters, TaskParameters, TemplateConfig class FindPeaksPyAlgosParameters(TaskParameters): @@ -122,3 +123,183 @@ def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str: ) return str(fname) return out_file + + +class FindPeaksPsocakeParameters(BaseBinaryParameters): + + class SZParameters(BaseModel): + compressor: Literal["qoz", "sz3"] = Field( + "qoz", description="SZ compression algorithm (qoz, sz3)" + ) + binSize: int = Field(2, description="SZ compression's bin size paramater") + roiWindowSize: int = Field( + 2, description="SZ compression's ROI window size paramater" + ) + absError: float = Field(10, descriptionp="Maximum absolute error value") + + executable: str = Field("mpirun", description="MPI executable.", flag_type="") + np: PositiveInt = Field( + max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1), + description="Number of processes", + flag_type="-", + ) + mca: str = Field( + "btl ^openib", description="Mca option for the MPI executable", flag_type="--" + ) + p_arg1: str = Field( + "python", description="Executable to run with mpi (i.e. python).", flag_type="" + ) + u: str = Field( + "", description="Python option for unbuffered output.", flag_type="-" + ) + p_arg2: str = Field( + "findPeaksSZ.py", + description="Executable to run with mpi (i.e. python).", + flag_type="", + ) + d: str = Field(description="Detector name", flag_type="-") + e: str = Field("", description="Experiment name", flag_type="-") + r: int = Field(-1, description="Run number", flag_type="-") + outDir: str = Field( + description="Output directory where .cxi will be saved", flag_type="--" + ) + algorithm: int = Field(1, description="PyAlgos algorithm to use", flag_type="--") + alg_npix_min: float = Field( + 1.0, description="PyAlgos algorithm's npix_min parameter", flag_type="--" + ) + alg_npix_max: float = Field( + 45.0, description="PyAlgos algorithm's npix_max parameter", flag_type="--" + ) + alg_amax_thr: float = Field( + 250.0, description="PyAlgos algorithm's amax_thr parameter", flag_type="--" + ) + alg_atot_thr: float = Field( + 330.0, description="PyAlgos algorithm's atot_thr parameter", flag_type="--" + ) + alg_son_min: float = Field( + 10.0, description="PyAlgos algorithm's son_min parameter", flag_type="--" + ) + alg1_thr_low: float = Field( + 80.0, description="PyAlgos algorithm's thr_low parameter", flag_type="--" + ) + alg1_thr_high: float = Field( + 270.0, description="PyAlgos algorithm's thr_high parameter", flag_type="--" + ) + alg1_rank: int = Field( + 3, description="PyAlgos algorithm's rank parameter", flag_type="--" + ) + alg1_radius: int = Field( + 3, description="PyAlgos algorithm's radius parameter", flag_type="--" + ) + alg1_dr: int = Field( + 1, description="PyAlgos algorithm's dr parameter", flag_type="--" + ) + psanaMask_on: str = Field( + "True", description="Whether psana's mask should be used", flag_type="--" + ) + psanaMask_calib: str = Field( + "True", description="Psana mask's calib parameter", flag_type="--" + ) + psanaMask_status: str = Field( + "True", description="Psana mask's status parameter", flag_type="--" + ) + psanaMask_edges: str = Field( + "True", description="Psana mask's edges parameter", flag_type="--" + ) + psanaMask_central: str = Field( + "True", description="Psana mask's central parameter", flag_type="--" + ) + psanaMask_unbond: str = Field( + "True", description="Psana mask's unbond parameter", flag_type="--" + ) + psanaMask_unbondnrs: str = Field( + "True", description="Psana mask's unbondnbrs parameter", flag_type="--" + ) + mask: str = Field( + "", description="Path to an additional mask to apply", flag_type="--" + ) + clen: str = Field( + description="Epics variable storing the camera length", flag_type="--" + ) + coffset: float = Field(0, description="Camera offset in m", flag_type="--") + minPeaks: int = Field( + 15, + description="Minimum number of peaks to mark frame for indexing", + flag_type="--", + ) + maxPeaks: int = Field( + 15, + description="Maximum number of peaks to mark frame for indexing", + flag_type="--", + ) + minRes: int = Field( + 0, + description="Minimum peak resolution to mark frame for indexing ", + flag_type="--", + ) + sample: str = Field("", description="Sample name", flag_type="--") + instrument: Union[None, str] = Field( + None, description="Instrument name", flag_type="--" + ) + pixelSize: float = Field(0.0, description="Pixel size", lag_type="--") + auto: str = Field( + "False", + description=( + "Whether to automatically determine peak per event peak " + "finding parameters" + ), + flag_type="--", + ) + detectorDistance: float = Field( + 0.0, description="Detector distance from interaction point in m" + ) + access: Literal["ana", "ffb"] = Field( + "ana", description="Data node type: {ana,ffb}" + ) + szfile: str = Field("qoz.json", description="Path to SZ's JSON configuration file") + lute_template_cfg: TemplateConfig = Field( + TemplateConfig( + template_name="sz.json", + output_path="", # Will want to change where this goes... + ), + description="Template information for the sz.json file", + ) + sz_parameters: SZParameters = Field( + description="Configuration parameters for SZ Compression", flag_type="" + ) + + @validator("e") + def validate_e(cls, e: str, values: Dict[str, Any]) -> str: + if e == "": + return values["lute_config"].experiment + return e + + @validator("r") + def validate_r(cls, r: int, values: Dict[str, Any]) -> int: + if r == -1: + return values["lute_config"].run + return r + + @validator("lute_template_cfg", always=True) + def set_output_path( + cls, lute_template_cfg: TemplateConfig, values: Dict[str, Any] + ) -> TemplateConfig: + if lute_template_cfg.output_path == "": + lute_template_cfg.output_path = values["szfile"] + return lute_template_cfg + + @validator("sz_parameters", always=True) + def set_sz_compression_parameters( + cls, sz_parameters: SZParameters, values: Dict[str, Any] + ) -> SZParameters: + values["compressor"] = sz_parameters.compressor + values["binSize"] = sz_parameters.binSize + values["roiWindowSize"] = sz_parameters.roiWindowSize + if sz_parameters.compressor == "qoz": + values["pressio_opts"] = { + "pressio:abs": sz_parameters.absError, + "qoz": {"qoz:stride": 8}, + } + else: + values["pressio_opts"] = {"pressio:abs": sz_parameters.absError} + return None diff --git a/lute/managed_tasks.py b/lute/managed_tasks.py index e927e8a9..ac71f464 100644 --- a/lute/managed_tasks.py +++ b/lute/managed_tasks.py @@ -1,7 +1,7 @@ from typing import Dict -from .io.config import * from .execution.executor import * +from .io.config import * # Tests ####### @@ -35,3 +35,4 @@ PeakFinderPyAlgos: MPIExecutor = MPIExecutor("FindPeaksPyAlgos") SHELXCRunner: Executor = Executor("RunSHELXC") SHELXCRunner.shell_source("/sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh") +PeakFinderPsocake: Executor = Executor("FindPeaksPsocake")