Skip to content

Commit

Permalink
ENH Stream file merge task (#15)
Browse files Browse the repository at this point in the history
* ENH Stream file merging task

* ENH Added header to module

* ENH Reading parameters for stream merging task from database

* ENH Default out_file parameter for stream file merging task

* BUG Remove unused options from parameter model

* BUG Switched to internal method to print feedback

* ENH Renamed task

* Fixed bugs in task renaming and
  • Loading branch information
valmar authored Apr 4, 2024
1 parent ff26ea7 commit 06128d9
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 9 deletions.
6 changes: 3 additions & 3 deletions lute/io/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Pydantic models for Task parameters and configuration."""

from .base import *
from .tests import *
from .smd import *
from .sfx_find_peaks import *
from .sfx_index import *
from .sfx_merge import *
from .sfx_solve import *
from .sfx_find_peaks import *
from .smd import *
from .tests import *
62 changes: 56 additions & 6 deletions lute/io/models/sfx_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@
CrystFEL's `indexamajig`.
"""

__all__ = ["IndexCrystFELParameters"]
__all__ = ["IndexCrystFELParameters", "ConcatenateStreamFilesParameters"]
__author__ = "Gabriel Dorlhiac"

import os
from typing import Union, List, Optional, Dict, Any
from pathlib import Path
from typing import Any, Dict, Optional

from pydantic import (
AnyUrl,
PositiveInt,
PositiveFloat,
NonNegativeInt,
Field,
NonNegativeInt,
PositiveFloat,
PositiveInt,
conint,
validator,
)

from .base import BaseBinaryParameters
from ..db import read_latest_db_entry
from .base import BaseBinaryParameters, TaskParameters


class IndexCrystFELParameters(BaseBinaryParameters):
Expand Down Expand Up @@ -403,3 +404,52 @@ def validate_out_file(cls, out_file: str, values: Dict[str, Any]) -> str:
fname: str = f"{expmt}_r{run:04d}.stream"
return f"{work_dir}/{fname}"
return out_file


class ConcatenateStreamFilesParameters(TaskParameters):

in_file: str = Field(
"",
description="Root of directory tree storing stream files to merge.",
)

tag: Optional[str] = Field(
"",
description="Tag identifying the stream files to merge.",
)

out_file: str = Field(
"",
description="Path to merged output stream file.",
)

@validator("in_file")
def validate_in_file(cls, in_file: str, values: Dict[str, Any]) -> str:
if in_file == "":
stream_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "IndexCrystFEL", "out_file"
)
if stream_file:
stream_dir: str = str(Path(stream_file).parent)
return stream_dir
return in_file

@validator("tag")
def validate_tag(cls, tag: str, values: Dict[str, Any]) -> str:
if tag == "":
stream_file: Optional[str] = read_latest_db_entry(
f"{values['lute_config'].work_dir}", "IndexCrystFEL", "out_file"
)
if stream_file:
stream_tag: str = Path(stream_file).name.split("_")[0]
return stream_tag
return tag

@validator("out_file")
def validate_out_file(cls, tag: str, values: Dict[str, Any]) -> str:
if tag == "":
stream_out_file: str = str(
Path(values["in_file"]).parent / f"{values['tag'].stream}"
)
return stream_out_file
return tag
1 change: 1 addition & 0 deletions lute/managed_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@
SHELXCRunner: Executor = Executor("RunSHELXC")
SHELXCRunner.shell_source("/sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh")
PeakFinderPsocake: Executor = Executor("FindPeaksPsocake")
StreamFileConcatenator: Executor = Executor("ConcatenateStreamFiles")
6 changes: 6 additions & 0 deletions lute/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

from typing import Type

from .task import Task


Expand Down Expand Up @@ -60,4 +61,9 @@ def import_task(task_name: str) -> Type[Task]:

return FindPeaksPyAlgos

if task_name == "ConcatenateStreamFiles":
from .sfx_index import ConcatenateStreamFiles

return ConcatenateStreamFiles

raise TaskNotFoundError
13 changes: 13 additions & 0 deletions lute/tasks/sfx_find_peaks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
"""
Classes for peak finding tasks in SFX.
Classes:
CxiWriter: utility class for writing peak finding results to CXI files.
FindPeaksPyAlgos: peak finding using psana's PyAlgos algorithm. Optional data
compression and decompression with libpressio for data reduction tests.
"""

__all__ = ["CxiWriter", "FindPeaksPyAlgos"]
__author__ = "Valerio Mariani"

import sys
from pathlib import Path
from typing import Any, Dict, List, Literal, TextIO, Tuple
Expand Down
53 changes: 53 additions & 0 deletions lute/tasks/sfx_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Classes for indexing tasks in SFX.
Classes:
ConcatenateStreamFIles: task that merges multiple stream files into a single file.
"""

__all__ = ["ConcatenateStreamFiles"]
__author__ = "Valerio Mariani"

import shutil
import sys
from pathlib import Path
from typing import BinaryIO, List

import numpy
from mpi4py import MPI

from lute.execution.ipc import Message
from lute.io.models.base import *
from lute.tasks.task import *


class ConcatenateStreamFiles(Task):
"""
Task that merges stream files located within a directory tree.
"""

def __init__(self, *, params: TaskParameters) -> None:
super().__init__(params=params)

def _run(self) -> None:

stream_file_path: Path = Path(self._task_parameters.in_file)
stream_file_list: List[Path] = list(
stream_file_path.rglob(f"{self._task_parameters.tag}_*.stream")
)

processed_file_list = [str(stream_file) for stream_file in stream_file_list]

msg: Message = Message(
contents=f"Merging following stream files: {processed_file_list} into "
f"{self._task_parameters.out_file}",
)
self._report_to_executor(msg)

wfd: BinaryIO
with open(self._task_parameters.out_file, "wb") as wfd:
infile: Path
for infile in stream_file_list:
fd: BinaryIO
with open(infile, "rb") as fd:
shutil.copyfileobj(fd, wfd)

0 comments on commit 06128d9

Please sign in to comment.