From 06128d97306f0bde0ead438f15b53395e15e616f Mon Sep 17 00:00:00 2001 From: valmar Date: Thu, 4 Apr 2024 14:43:05 -0700 Subject: [PATCH] ENH Stream file merge task (#15) * ENH Stream file merging task * ENH Added header to module * ENH Reading parameters for stream merging task from database * ENH Default out_file parameter for stream file merging task * BUG Remove unused options from parameter model * BUG Switched to internal method to print feedback * ENH Renamed task * Fixed bugs in task renaming and --- lute/io/models/__init__.py | 6 ++-- lute/io/models/sfx_index.py | 62 ++++++++++++++++++++++++++++++++---- lute/managed_tasks.py | 1 + lute/tasks/__init__.py | 6 ++++ lute/tasks/sfx_find_peaks.py | 13 ++++++++ lute/tasks/sfx_index.py | 53 ++++++++++++++++++++++++++++++ 6 files changed, 132 insertions(+), 9 deletions(-) create mode 100644 lute/tasks/sfx_index.py diff --git a/lute/io/models/__init__.py b/lute/io/models/__init__.py index 087a4780..a221844f 100644 --- a/lute/io/models/__init__.py +++ b/lute/io/models/__init__.py @@ -1,9 +1,9 @@ """Pydantic models for Task parameters and configuration.""" from .base import * -from .tests import * -from .smd import * +from .sfx_find_peaks import * from .sfx_index import * from .sfx_merge import * from .sfx_solve import * -from .sfx_find_peaks import * +from .smd import * +from .tests import * diff --git a/lute/io/models/sfx_index.py b/lute/io/models/sfx_index.py index 57c6d346..901bb0c3 100644 --- a/lute/io/models/sfx_index.py +++ b/lute/io/models/sfx_index.py @@ -5,24 +5,25 @@ CrystFEL's `indexamajig`. """ -__all__ = ["IndexCrystFELParameters"] +__all__ = ["IndexCrystFELParameters", "ConcatenateStreamFilesParameters"] __author__ = "Gabriel Dorlhiac" import os -from typing import Union, List, Optional, Dict, Any +from pathlib import Path +from typing import Any, Dict, Optional from pydantic import ( AnyUrl, - PositiveInt, - PositiveFloat, - NonNegativeInt, Field, + NonNegativeInt, + PositiveFloat, + PositiveInt, conint, validator, ) -from .base import BaseBinaryParameters from ..db import read_latest_db_entry +from .base import BaseBinaryParameters, TaskParameters class IndexCrystFELParameters(BaseBinaryParameters): @@ -403,3 +404,52 @@ def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str: fname: str = f"{expmt}_r{run:04d}.stream" return f"{work_dir}/{fname}" return out_file + + +class ConcatenateStreamFilesParameters(TaskParameters): + + in_file: str = Field( + "", + description="Root of directory tree storing stream files to merge.", + ) + + tag: Optional[str] = Field( + "", + description="Tag identifying the stream files to merge.", + ) + + out_file: str = Field( + "", + description="Path to merged output stream file.", + ) + + @validator("in_file") + def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str: + if in_file == "": + stream_file: Optional[str] = read_latest_db_entry( + f"{values['lute_config'].work_dir}", "IndexCrystFEL", "out_file" + ) + if stream_file: + stream_dir: str = str(Path(stream_file).parent) + return stream_dir + return in_file + + @validator("tag") + def validate_tag(cls, tag: str, values: Dict[str, Any]) -> str: + if tag == "": + stream_file: Optional[str] = read_latest_db_entry( + f"{values['lute_config'].work_dir}", "IndexCrystFEL", "out_file" + ) + if stream_file: + stream_tag: str = Path(stream_file).name.split("_")[0] + return stream_tag + return tag + + @validator("out_file") + def validate_out_file(cls, tag: str, values: Dict[str, Any]) -> str: + if tag == "": + stream_out_file: str = str( + Path(values["in_file"]).parent / f"{values['tag'].stream}" + ) + return stream_out_file + return tag diff --git a/lute/managed_tasks.py b/lute/managed_tasks.py index ac71f464..aa69a947 100644 --- a/lute/managed_tasks.py +++ b/lute/managed_tasks.py @@ -36,3 +36,4 @@ SHELXCRunner: Executor = Executor("RunSHELXC") SHELXCRunner.shell_source("/sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh") PeakFinderPsocake: Executor = Executor("FindPeaksPsocake") +StreamFileConcatenator: Executor = Executor("ConcatenateStreamFiles") diff --git a/lute/tasks/__init__.py b/lute/tasks/__init__.py index ad8d4789..b7772468 100644 --- a/lute/tasks/__init__.py +++ b/lute/tasks/__init__.py @@ -9,6 +9,7 @@ """ from typing import Type + from .task import Task @@ -60,4 +61,9 @@ def import_task(task_name: str) -> Type[Task]: return FindPeaksPyAlgos + if task_name == "ConcatenateStreamFiles": + from .sfx_index import ConcatenateStreamFiles + + return ConcatenateStreamFiles + raise TaskNotFoundError diff --git a/lute/tasks/sfx_find_peaks.py b/lute/tasks/sfx_find_peaks.py index 6234efcb..e30c3237 100644 --- a/lute/tasks/sfx_find_peaks.py +++ b/lute/tasks/sfx_find_peaks.py @@ -1,3 +1,16 @@ +""" +Classes for peak finding tasks in SFX. + +Classes: + CxiWriter: utility class for writing peak finding results to CXI files. + + FindPeaksPyAlgos: peak finding using psana's PyAlgos algorithm. Optional data + compression and decompression with libpressio for data reduction tests. +""" + +__all__ = ["CxiWriter", "FindPeaksPyAlgos"] +__author__ = "Valerio Mariani" + import sys from pathlib import Path from typing import Any, Dict, List, Literal, TextIO, Tuple diff --git a/lute/tasks/sfx_index.py b/lute/tasks/sfx_index.py new file mode 100644 index 00000000..4811fd94 --- /dev/null +++ b/lute/tasks/sfx_index.py @@ -0,0 +1,53 @@ +""" +Classes for indexing tasks in SFX. + +Classes: + ConcatenateStreamFIles: task that merges multiple stream files into a single file. +""" + +__all__ = ["ConcatenateStreamFiles"] +__author__ = "Valerio Mariani" + +import shutil +import sys +from pathlib import Path +from typing import BinaryIO, List + +import numpy +from mpi4py import MPI + +from lute.execution.ipc import Message +from lute.io.models.base import * +from lute.tasks.task import * + + +class ConcatenateStreamFiles(Task): + """ + Task that merges stream files located within a directory tree. + """ + + def __init__(self, *, params: TaskParameters) -> None: + super().__init__(params=params) + + def _run(self) -> None: + + stream_file_path: Path = Path(self._task_parameters.in_file) + stream_file_list: List[Path] = list( + stream_file_path.rglob(f"{self._task_parameters.tag}_*.stream") + ) + + processed_file_list = [str(stream_file) for stream_file in stream_file_list] + + msg: Message = Message( + contents=f"Merging following stream files: {processed_file_list} into " + f"{self._task_parameters.out_file}", + ) + self._report_to_executor(msg) + + wfd: BinaryIO + with open(self._task_parameters.out_file, "wb") as wfd: + infile: Path + for infile in stream_file_list: + fd: BinaryIO + with open(infile, "rb") as fd: + shutil.copyfileobj(fd, wfd)