Skip to content

Commit

Permalink
ENH Added Psocake peak finding task
Browse files Browse the repository at this point in the history
  • Loading branch information
valmar committed Mar 27, 2024
1 parent 3b07026 commit 5d5c03c
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 12 deletions.
55 changes: 55 additions & 0 deletions config/templates/sz.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"compressor_id": "pressio",
"early_config": {
"pressio": {
"pressio:compressor": "roibin",
"roibin": {
"roibin:metric": "composite",
"roibin:background": "mask_binning",
"roibin:roi": "fpzip",
"background": {
"binning:compressor": "pressio",
"mask_binning:compressor": "pressio",
"pressio": {
"pressio:compressor": {{ compressor | tojson }}
}
},
"composite": {
"composite:plugins": [
"size",
"time",
"input_stats",
"error_stat"
]
}
}
}
},
"compressor_config": {
"pressio": {
"roibin": {
"roibin:roi_size": [
{{ roiWindowSize}},
{{ roiWindowSize}},
0
],
"roibin:centers": null,
"roibin:nthreads": 4,
"roi": {
"fpzip:prec": 0
},
"background": {
"mask_binning:mask": null,
"mask_binning:shape": [
{{ binSize }},
{{ binSize }},
1
],
"mask_binning:nthreads": 4,
"pressio": {{ pressio_opts | tojson }}
}
}
}
},
"name": "pressio"
}
19 changes: 10 additions & 9 deletions lute/execution/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@
]
__author__ = "Gabriel Dorlhiac"

import _io
import logging
import os
import sys
import socket
import pickle
import subprocess
import select
import logging
import socket
import subprocess
import sys
import warnings
from typing import Optional, Any, Set
from typing_extensions import Self
from dataclasses import dataclass
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional, Set

import _io
from typing_extensions import Self

LUTE_SIGNALS: Set[str] = {
"NO_PICKLE_MODE",
Expand Down Expand Up @@ -182,7 +183,7 @@ def read(self, proc: subprocess.Popen) -> Message:
contents = raw_contents.decode()
except UnicodeDecodeError as err:
logger.debug("PipeCommunicator (Executor) - Set _use_pickle=True")
self._use_pickle = True
# self._use_pickle = True
contents = self._safe_unpickle_decode(raw_contents)
else:
contents = None
Expand Down
185 changes: 183 additions & 2 deletions lute/io/models/sfx_find_peaks.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
from pathlib import Path
from typing import Any, Dict, Literal, Optional, Union

from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, PositiveInt, validator

from .base import BaseBinaryParameters, TaskParameters
from .base import BaseBinaryParameters, TaskParameters, TemplateConfig


class FindPeaksPyAlgosParameters(TaskParameters):
Expand Down Expand Up @@ -122,3 +123,183 @@ def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
)
return str(fname)
return out_file


class FindPeaksPsocakeParameters(BaseBinaryParameters):

class SZParameters(BaseModel):
compressor: Literal["qoz", "sz3"] = Field(
"qoz", description="SZ compression algorithm (qoz, sz3)"
)
binSize: int = Field(2, description="SZ compression's bin size paramater")
roiWindowSize: int = Field(
2, description="SZ compression's ROI window size paramater"
)
absError: float = Field(10, descriptionp="Maximum absolute error value")

executable: str = Field("mpirun", description="MPI executable.", flag_type="")
np: PositiveInt = Field(
max(int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1),
description="Number of processes",
flag_type="-",
)
mca: str = Field(
"btl ^openib", description="Mca option for the MPI executable", flag_type="--"
)
p_arg1: str = Field(
"python", description="Executable to run with mpi (i.e. python).", flag_type=""
)
u: str = Field(
"", description="Python option for unbuffered output.", flag_type="-"
)
p_arg2: str = Field(
"findPeaksSZ.py",
description="Executable to run with mpi (i.e. python).",
flag_type="",
)
d: str = Field(description="Detector name", flag_type="-")
e: str = Field("", description="Experiment name", flag_type="-")
r: int = Field(-1, description="Run number", flag_type="-")
outDir: str = Field(
description="Output directory where .cxi will be saved", flag_type="--"
)
algorithm: int = Field(1, description="PyAlgos algorithm to use", flag_type="--")
alg_npix_min: float = Field(
1.0, description="PyAlgos algorithm's npix_min parameter", flag_type="--"
)
alg_npix_max: float = Field(
45.0, description="PyAlgos algorithm's npix_max parameter", flag_type="--"
)
alg_amax_thr: float = Field(
250.0, description="PyAlgos algorithm's amax_thr parameter", flag_type="--"
)
alg_atot_thr: float = Field(
330.0, description="PyAlgos algorithm's atot_thr parameter", flag_type="--"
)
alg_son_min: float = Field(
10.0, description="PyAlgos algorithm's son_min parameter", flag_type="--"
)
alg1_thr_low: float = Field(
80.0, description="PyAlgos algorithm's thr_low parameter", flag_type="--"
)
alg1_thr_high: float = Field(
270.0, description="PyAlgos algorithm's thr_high parameter", flag_type="--"
)
alg1_rank: int = Field(
3, description="PyAlgos algorithm's rank parameter", flag_type="--"
)
alg1_radius: int = Field(
3, description="PyAlgos algorithm's radius parameter", flag_type="--"
)
alg1_dr: int = Field(
1, description="PyAlgos algorithm's dr parameter", flag_type="--"
)
psanaMask_on: str = Field(
"True", description="Whether psana's mask should be used", flag_type="--"
)
psanaMask_calib: str = Field(
"True", description="Psana mask's calib parameter", flag_type="--"
)
psanaMask_status: str = Field(
"True", description="Psana mask's status parameter", flag_type="--"
)
psanaMask_edges: str = Field(
"True", description="Psana mask's edges parameter", flag_type="--"
)
psanaMask_central: str = Field(
"True", description="Psana mask's central parameter", flag_type="--"
)
psanaMask_unbond: str = Field(
"True", description="Psana mask's unbond parameter", flag_type="--"
)
psanaMask_unbondnrs: str = Field(
"True", description="Psana mask's unbondnbrs parameter", flag_type="--"
)
mask: str = Field(
"", description="Path to an additional mask to apply", flag_type="--"
)
clen: str = Field(
description="Epics variable storing the camera length", flag_type="--"
)
coffset: float = Field(0, description="Camera offset in m", flag_type="--")
minPeaks: int = Field(
15,
description="Minimum number of peaks to mark frame for indexing",
flag_type="--",
)
maxPeaks: int = Field(
15,
description="Maximum number of peaks to mark frame for indexing",
flag_type="--",
)
minRes: int = Field(
0,
description="Minimum peak resolution to mark frame for indexing ",
flag_type="--",
)
sample: str = Field("", description="Sample name", flag_type="--")
instrument: Union[None, str] = Field(
None, description="Instrument name", flag_type="--"
)
pixelSize: float = Field(0.0, description="Pixel size", lag_type="--")
auto: str = Field(
"False",
description=(
"Whether to automatically determine peak per event peak "
"finding parameters"
),
flag_type="--",
)
detectorDistance: float = Field(
0.0, description="Detector distance from interaction point in m"
)
access: Literal["ana", "ffb"] = Field(
"ana", description="Data node type: {ana,ffb}"
)
szfile: str = Field("qoz.json", description="Path to SZ's JSON configuration file")
lute_template_cfg: TemplateConfig = Field(
TemplateConfig(
template_name="sz.json",
output_path="", # Will want to change where this goes...
),
description="Template information for the sz.json file",
)
sz_parameters: SZParameters = Field(
description="Configuration parameters for SZ Compression", flag_type=""
)

@validator("e")
def validate_e(cls, e: str, values: Dict[str, Any]) -> str:
if e == "":
return values["lute_config"].experiment
return e

@validator("r")
def validate_r(cls, r: int, values: Dict[str, Any]) -> int:
if r == -1:
return values["lute_config"].run
return r

@validator("lute_template_cfg", always=True)
def set_output_path(
cls, lute_template_cfg: TemplateConfig, values: Dict[str, Any]
) -> TemplateConfig:
if lute_template_cfg.output_path == "":
lute_template_cfg.output_path = values["szfile"]
return lute_template_cfg

@validator("sz_parameters", always=True)
def set_sz_compression_parameters(
cls, sz_parameters: SZParameters, values: Dict[str, Any]
) -> SZParameters:
values["compressor"] = sz_parameters.compressor
values["binSize"] = sz_parameters.binSize
values["roiWindowSize"] = sz_parameters.roiWindowSize
if sz_parameters.compressor == "qoz":
values["pressio_opts"] = {
"pressio:abs": sz_parameters.absError,
"qoz": {"qoz:stride": 8},
}
else:
values["pressio_opts"] = {"pressio:abs": sz_parameters.absError}
return None
3 changes: 2 additions & 1 deletion lute/managed_tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Dict

from .io.config import *
from .execution.executor import *
from .io.config import *

# Tests
#######
Expand Down Expand Up @@ -35,3 +35,4 @@
PeakFinderPyAlgos: MPIExecutor = MPIExecutor("FindPeaksPyAlgos")
SHELXCRunner: Executor = Executor("RunSHELXC")
SHELXCRunner.shell_source("/sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh")
PeakFinderPsocake: Executor = Executor("FindPeaksPsocake")

0 comments on commit 5d5c03c

Please sign in to comment.