Skip to content

Commit

Permalink
Merge pull request #19 from gadorlhiac/BUG/launch_bug
Browse files Browse the repository at this point in the history
BUG Airflow/ARP bugs (Part 2)
  • Loading branch information
valmar authored Apr 11, 2024
2 parents 0a2df25 + 43ceab0 commit d59e8c3
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 22 deletions.
3 changes: 2 additions & 1 deletion launch_scripts/submit_slurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ EOF
}

POSITIONAL=()

while [[ $# -gt 0 ]]
do
flag="$1"
Expand Down Expand Up @@ -67,7 +68,7 @@ RUN="${RUN_TIME_ARR[0]}"
FORMAT_RUN=$(printf "%04d" ${RUN:-0})
LOG_FILE="${TASK}_${EXPERIMENT:-$EXP}_r${FORMAT_RUN}_$(date +'%Y-%m-%d_%H-%M-%S')"
SLURM_ARGS+=" --output=${LOG_FILE}.out"
SLURM_ARGS+=" --error=${LOG_FILE}.err"
SLURM_ARGS+=" --error=${LOG_FILE}.out"

export LUTE_SOCKET="/tmp/lute_${RANDOM}.sock"

Expand Down
11 changes: 10 additions & 1 deletion lute/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
__all__ = ["BaseExecutor", "Executor", "MPIExecutor"]
__author__ = "Gabriel Dorlhiac"

import sys
import _io
import logging
import subprocess
Expand Down Expand Up @@ -127,7 +128,7 @@ def __init__(
result: TaskResult = TaskResult(
task_name=task_name, task_status=TaskStatus.PENDING, summary="", payload=""
)
task_parameters: TaskParameters = TaskParameters()
task_parameters: Optional[TaskParameters] = None
task_env: Dict[str, str] = os.environ.copy()
self._communicators: List[Communicator] = communicators
communicator_desc: List[str] = []
Expand Down Expand Up @@ -315,6 +316,10 @@ def execute_task(self) -> None:
for comm in self._communicators:
comm.clear_communicator()

if self._analysis_desc.task_result.task_status == TaskStatus.FAILED:
logger.info("Exiting after Task failure. Result recorded.")
sys.exit(-1)

def _store_configuration(self) -> None:
"""Store configuration and results in the LUTE database."""
record_analysis_db(copy.deepcopy(self._analysis_desc))
Expand Down Expand Up @@ -519,3 +524,7 @@ def execute_task(self) -> None:
self._store_configuration()
for comm in self._communicators:
comm.clear_communicator()

if self._analysis_desc.task_result.task_status == TaskStatus.FAILED:
logger.info("Exiting after Task failure. Result recorded.")
sys.exit(-1)
9 changes: 6 additions & 3 deletions lute/io/_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ def _make_task_table(
current_cols: Dict[str, str] = _get_table_cols(con, task_name)
if diff := _compare_cols(current_cols, columns):
for col in diff.items():
sql: str = f"ALTER TABLE {task_name} ADD COLUMN {col[0]} {col[1]}"
sql: str = f'ALTER TABLE {task_name} ADD COLUMN "{col[0]}" {col[1]}'
logger.debug(f"_make_task_table[ALTER]: {sql}")
with con:
con.execute(sql)

Expand All @@ -172,6 +173,7 @@ def _make_task_table(
"valid_flag INTEGER)"
)
sql: str = f"CREATE TABLE IF NOT EXISTS {db_str}"
logger.debug(f"_make_task_table[CREATE]: {sql}")
with con:
con.execute(sql)
return _does_table_exist(con, task_name)
Expand Down Expand Up @@ -214,14 +216,15 @@ def _add_task_entry(
entry (Dict[str, Any]): A dictionary of entries in the format of
{COLUMN: ENTRY}. These are assumed to match the columns of the table.
"""
placeholder_str: str = ", ".join("?" for x in range(len(entry)))
placeholder_str: str = ", ".join("?" for _ in range(len(entry)))
keys: List[str] = []
values: List[str] = []
for key, value in entry.items():
keys.append(f'"{key}"')
values.append(value)
with con:
ins_str: str = "".join(f':"{x}", ' for x in entry.keys())[:-2]
# ins_str: str = "".join(f':"{x}", ' for x in entry.keys())[:-2]
logger.debug(f"_add_task_entry: {keys}\n\t\t{values}")
res = con.execute(
f"INSERT INTO {task_name} ({','.join(keys)}) VALUES ({placeholder_str})",
values,
Expand Down
24 changes: 18 additions & 6 deletions lute/io/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import logging
from typing import List, Dict, Dict, Any, Tuple, Optional

from .models.base import TaskParameters
from .models.base import TaskParameters, TemplateParameters
from ..tasks.dataclasses import TaskResult, TaskStatus, DescribedAnalysis

if __debug__:
Expand Down Expand Up @@ -188,13 +188,16 @@ def _dict_to_flatdicts(
flat_key = key
else:
flat_key = f"{curr_key}.{key}"
if isinstance(value, dict):
x, y = _dict_to_flatdicts(value, curr_key=flat_key)
corrected_value: Any = value
if isinstance(corrected_value, TemplateParameters):
corrected_value = value.params
if isinstance(corrected_value, dict):
x, y = _dict_to_flatdicts(corrected_value, curr_key=flat_key)
param_list.extend(x.items())
type_list.extend(y.items())
else:
param_list.append((flat_key, value))
type_list.append((flat_key, _check_type(value)))
param_list.append((flat_key, corrected_value))
type_list.append((flat_key, _check_type(corrected_value)))

return dict(param_list), dict(type_list)

Expand All @@ -216,7 +219,16 @@ def record_analysis_db(cfg: DescribedAnalysis) -> None:
_add_task_entry,
)

work_dir: str = cfg.task_parameters.lute_config.work_dir
try:
work_dir: str = cfg.task_parameters.lute_config.work_dir
except AttributeError:
logger.info(
(
"Unable to access TaskParameters object. Likely wasn't created. "
"Cannot store result."
)
)
return
del cfg.task_parameters.lute_config.work_dir

exec_entry, exec_columns = _cfg_to_exec_entry_cols(cfg)
Expand Down
32 changes: 27 additions & 5 deletions lute/io/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@ class AnalysisHeader(BaseModel):
"LUTE Task Configuration",
description="Description of the configuration or experiment.",
)
experiment: str = Field("EXPX00000", description="Experiment.")
run: Union[str, int] = Field(
os.environ.get("RUN", ""), description="Data acquisition run."
)
experiment: str = Field("", description="Experiment.")
run: Union[str, int] = Field("", description="Data acquisition run.")
date: str = Field("1970/01/01", description="Start date of analysis.")
lute_version: Union[float, str] = Field(
0.1, description="Version of LUTE used for analysis."
Expand All @@ -71,8 +69,32 @@ def validate_work_dir(cls, work_dir: str, values: Dict[str, Any]) -> str:
f"/sdf/data/lcls/ds/{values['experiment'][:3]}/"
f"{values['experiment']}/scratch"
)
# Check existence and permissions
if not os.path.exists(work_dir):
raise ValueError(f"Working Directory: {work_dir} does not exist!")
if not os.access(work_dir, os.W_OK):
# Need write access for database, files etc.
raise ValueError(f"Not write access for working directory: {work_dir}!")
return work_dir

@validator("run", always=True)
def validate_run(
cls, run: Union[str, int], values: Dict[str, Any]
) -> Union[str, int]:
if run == "":
# From Airflow RUN_NUM should have Format "RUN_DATETIME" - Num is first part
run_time: str = os.environ.get("RUN_NUM", "")
if run_time != "":
return int(run_time.split("_")[0])
return run

@validator("experiment", always=True)
def validate_experiment(cls, experiment: str, values: Dict[str, Any]) -> str:
if experiment == "":
arp_exp: str = os.environ.get("EXPERIMENT", "EXPX00000")
return arp_exp
return experiment


class TaskParameters(BaseSettings):
"""Base class for models of task parameters to be validated.
Expand All @@ -95,7 +117,7 @@ class Config:
copy_on_model_validation: str = "deep"
allow_inf_nan: bool = False

lute_config: AnalysisHeader = AnalysisHeader()
lute_config: AnalysisHeader


@dataclass
Expand Down
8 changes: 4 additions & 4 deletions lute/io/models/sfx_find_peaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class SZParameters(BaseModel):
instrument: Union[None, str] = Field(
None, description="Instrument name", flag_type="--"
)
pixelSize: float = Field(0.0, description="Pixel size", lag_type="--")
pixelSize: float = Field(0.0, description="Pixel size", flag_type="--")
auto: str = Field(
"False",
description=(
Expand All @@ -251,10 +251,10 @@ class SZParameters(BaseModel):
flag_type="--",
)
detectorDistance: float = Field(
0.0, description="Detector distance from interaction point in m"
0.0, description="Detector distance from interaction point in m", flag_type="--"
)
access: Literal["ana", "ffb"] = Field(
"ana", description="Data node type: {ana,ffb}"
"ana", description="Data node type: {ana,ffb}", flag_type="--"
)
szfile: str = Field("qoz.json", description="Path to SZ's JSON configuration file")
lute_template_cfg: TemplateConfig = Field(
Expand Down Expand Up @@ -291,7 +291,7 @@ def set_output_path(
@validator("sz_parameters", always=True)
def set_sz_compression_parameters(
cls, sz_parameters: SZParameters, values: Dict[str, Any]
) -> SZParameters:
) -> None:
values["compressor"] = sz_parameters.compressor
values["binSize"] = sz_parameters.binSize
values["roiWindowSize"] = sz_parameters.roiWindowSize
Expand Down
2 changes: 1 addition & 1 deletion lute/tasks/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TaskResult:
@dataclass
class DescribedAnalysis:
task_result: TaskResult
task_parameters: TaskParameters
task_parameters: Optional[TaskParameters]
task_env: Dict[str, str]
poll_interval: float
communicator_desc: List[str]
5 changes: 4 additions & 1 deletion lute/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,10 @@ def _pre_run(self) -> None:
self._args_list.append(f"{constructed_flag}")
else:
warnings.warn(
"Model parameters should be defined using Field(...,flag_type='') in the future.",
(
f"Model parameters should be defined using Field(...,flag_type='')"
f" in the future. Parameter: {param}"
),
category=PendingDeprecationWarning,
)
if len(param) == 1: # Single-dash flags
Expand Down
9 changes: 9 additions & 0 deletions workflows/airflow/operators/jidoperators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
__all__ = ["JIDSlurmOperator", "RequestOnlyOperator"]
__author__ = "Fred Poitevin, Murali Shankar"

import sys
import uuid
import getpass
import time
Expand Down Expand Up @@ -329,6 +330,14 @@ def execute(self, context: Dict[str, Any]) -> None:
# Logs out to xcom
out = self.rpc("job_log_file", jobs[0], context)
context["task_instance"].xcom_push(key="log", value=out)
failure_messages: List[str] = [
"INFO:lute.execution.executor:Task failed with return code:",
"INFO:lute.execution.executor:Exiting after Task failure.",
]
for msg in failure_messages:
if msg in out:
logger.info("Logs indicate `Task` failed.")
sys.exit(-1)


class JIDPlugins(AirflowPlugin):
Expand Down

0 comments on commit d59e8c3

Please sign in to comment.