diff --git a/python/lsst/ctrl/mpexec/cmdLineFwk.py b/python/lsst/ctrl/mpexec/cmdLineFwk.py index 9ef1080f..80a377cf 100644 --- a/python/lsst/ctrl/mpexec/cmdLineFwk.py +++ b/python/lsst/ctrl/mpexec/cmdLineFwk.py @@ -807,7 +807,6 @@ def runPipeline( skipExistingIn=args.skip_existing_in, clobberOutputs=args.clobber_outputs, enableLsstDebug=args.enableLsstDebug, - exitOnKnownError=args.fail_fast, resources=resources, ) @@ -991,7 +990,6 @@ def runGraphQBB(self, task_factory: TaskFactory, args: SimpleNamespace) -> None: butler=None, taskFactory=task_factory, enableLsstDebug=args.enableLsstDebug, - exitOnKnownError=args.fail_fast, limited_butler_factory=_butler_factory, resources=resources, assumeNoExistingOutputs=True, diff --git a/python/lsst/ctrl/mpexec/mpGraphExecutor.py b/python/lsst/ctrl/mpexec/mpGraphExecutor.py index 718a989b..e740e164 100644 --- a/python/lsst/ctrl/mpexec/mpGraphExecutor.py +++ b/python/lsst/ctrl/mpexec/mpGraphExecutor.py @@ -43,7 +43,7 @@ from typing import Literal from lsst.daf.butler.cli.cliLog import CliLog -from lsst.pipe.base import InvalidQuantumError +from lsst.pipe.base import InvalidQuantumError, RepeatableQuantumError from lsst.pipe.base.graph.graph import QuantumGraph, QuantumNode from lsst.pipe.base.pipeline_graph import TaskNode from lsst.utils.threads import disable_implicit_threading @@ -74,8 +74,9 @@ class _Job: Quantum and some associated information. """ - def __init__(self, qnode: QuantumNode): + def __init__(self, qnode: QuantumNode, fail_fast: bool = False): self.qnode = qnode + self._fail_fast = fail_fast self.process: multiprocessing.process.BaseProcess | None = None self._state = JobState.PENDING self.started: float = 0.0 @@ -122,7 +123,7 @@ def start( mp_ctx = multiprocessing.get_context(startMethod) self.process = mp_ctx.Process( # type: ignore[attr-defined] target=_Job._executeJob, - args=(quantumExecutor, task_node, quantum_pickle, logConfigState, snd_conn), + args=(quantumExecutor, task_node, quantum_pickle, logConfigState, snd_conn, self._fail_fast), name=f"task-{self.qnode.quantum.dataId}", ) # mypy is getting confused by multiprocessing. @@ -138,6 +139,7 @@ def _executeJob( quantum_pickle: bytes, logConfigState: list, snd_conn: multiprocessing.connection.Connection, + fail_fast: bool, ) -> None: """Execute a job with arguments. @@ -168,8 +170,33 @@ def _executeJob( quantum = pickle.loads(quantum_pickle) report: QuantumReport | None = None + # Catch a few known failure modes and stop the process immediately, + # with exception-specific exit code. try: - quantum, report = quantumExecutor.execute(task_node, quantum) + _, report = quantumExecutor.execute(task_node, quantum) + except RepeatableQuantumError as exc: + report = QuantumReport.from_exception( + exception=exc, + dataId=quantum.dataId, + taskLabel=task_node.label, + exitCode=exc.EXIT_CODE if fail_fast else None, + ) + if fail_fast: + _LOG.warning("Caught repeatable quantum error for %s (%s):", task_node.label, quantum.dataId) + _LOG.warning(exc, exc_info=True) + sys.exit(exc.EXIT_CODE) + else: + raise + except InvalidQuantumError as exc: + _LOG.fatal("Invalid quantum error for %s (%s): %s", task_node.label, quantum.dataId) + _LOG.fatal(exc, exc_info=True) + report = QuantumReport.from_exception( + exception=exc, + dataId=quantum.dataId, + taskLabel=task_node.label, + exitCode=exc.EXIT_CODE, + ) + sys.exit(exc.EXIT_CODE) except Exception as exc: _LOG.debug("exception from task %s dataId %s: %s", task_node.label, quantum.dataId, exc) report = QuantumReport.from_exception( @@ -490,11 +517,31 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None: continue _LOG.debug("Executing %s", qnode) + fail_exit_code: int | None = None try: - _, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum) - if quantum_report: - report.quantaReports.append(quantum_report) - successCount += 1 + # For some exception types we want to exit immediately with + # exception-specific exit code, but we still want to start + # debugger before exiting if debugging is enabled. + try: + _, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum) + if quantum_report: + report.quantaReports.append(quantum_report) + successCount += 1 + except RepeatableQuantumError as exc: + if self.failFast: + _LOG.warning( + "Caught repeatable quantum error for %s (%s):", + task_node.label, + qnode.quantum.dataId, + ) + _LOG.warning(exc, exc_info=True) + fail_exit_code = exc.EXIT_CODE + raise + except InvalidQuantumError as exc: + _LOG.fatal("Invalid quantum error for %s (%s): %s", task_node.label, qnode.quantum.dataId) + _LOG.fatal(exc, exc_info=True) + fail_exit_code = exc.EXIT_CODE + raise except Exception as exc: quantum_report = QuantumReport.from_exception( exception=exc, @@ -523,6 +570,11 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None: pdb.post_mortem(exc.__traceback__) failedNodes.add(qnode) report.status = ExecutionStatus.FAILURE + + # If exception specified an exit code then just exit with that + # code, otherwise crash if fail-fast option is enabled. + if fail_exit_code is not None: + sys.exit(fail_exit_code) if self.failFast: raise MPGraphExecutorError( f"Task <{task_node.label} dataId={qnode.quantum.dataId}> failed." diff --git a/python/lsst/ctrl/mpexec/reports.py b/python/lsst/ctrl/mpexec/reports.py index 1d02dc01..124f7e49 100644 --- a/python/lsst/ctrl/mpexec/reports.py +++ b/python/lsst/ctrl/mpexec/reports.py @@ -146,6 +146,8 @@ def from_exception( exception: Exception, dataId: DataId, taskLabel: str, + *, + exitCode: int | None = None, ) -> QuantumReport: """Construct report instance from an exception and other pieces of data. @@ -158,11 +160,15 @@ def from_exception( Data ID of quantum. taskLabel : `str` Label of task. + exitCode : `int`, optional + Exit code for the process, used when it is known that the process + will exit with that exit code. """ return cls( status=ExecutionStatus.FAILURE, dataId=dataId, taskLabel=taskLabel, + exitCode=exitCode, exceptionInfo=ExceptionInfo.from_exception(exception), ) diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index c9424693..bebad460 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -31,7 +31,6 @@ # Imports of standard modules -- # ------------------------------- import logging -import sys import time from collections import defaultdict from collections.abc import Callable @@ -56,7 +55,6 @@ NoWorkFound, PipelineTask, QuantumContext, - RepeatableQuantumError, TaskFactory, ) from lsst.pipe.base.pipeline_graph import TaskNode @@ -104,11 +102,6 @@ class SingleQuantumExecutor(QuantumExecutor): a quantum will be removed. Only used when ``butler`` is not `None`. enableLsstDebug : `bool`, optional Enable debugging with ``lsstDebug`` facility for a task. - exitOnKnownError : `bool`, optional - If `True`, call `sys.exit` with the appropriate exit code for special - known exceptions, after printing a traceback, instead of letting the - exception propagate up to calling. This is always the behavior for - InvalidQuantumError. limited_butler_factory : `Callable`, optional A method that creates a `~lsst.daf.butler.LimitedButler` instance for a given Quantum. This parameter must be defined if ``butler`` is @@ -135,7 +128,6 @@ def __init__( skipExistingIn: Any = None, clobberOutputs: bool = False, enableLsstDebug: bool = False, - exitOnKnownError: bool = False, limited_butler_factory: Callable[[Quantum], LimitedButler] | None = None, resources: ExecutionResources | None = None, skipExisting: bool = False, @@ -145,7 +137,6 @@ def __init__( self.taskFactory = taskFactory self.enableLsstDebug = enableLsstDebug self.clobberOutputs = clobberOutputs - self.exitOnKnownError = exitOnKnownError self.limited_butler_factory = limited_butler_factory self.resources = resources self.assumeNoExistingOutputs = assumeNoExistingOutputs @@ -480,25 +471,13 @@ def runQuantum( # Get the input and output references for the task inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum) - # Call task runQuantum() method. Catch a few known failure modes and - # translate them into specific + # Call task runQuantum() method. try: task.runQuantum(butlerQC, inputRefs, outputRefs) except NoWorkFound as err: # Not an error, just an early exit. _LOG.info("Task '%s' on quantum %s exited early: %s", task_node.label, quantum.dataId, str(err)) pass - except RepeatableQuantumError as err: - if self.exitOnKnownError: - _LOG.warning("Caught repeatable quantum error for %s (%s):", task_node.label, quantum.dataId) - _LOG.warning(err, exc_info=True) - sys.exit(err.EXIT_CODE) - else: - raise - except InvalidQuantumError as err: - _LOG.fatal("Invalid quantum error for %s (%s): %s", task_node.label, quantum.dataId) - _LOG.fatal(err, exc_info=True) - sys.exit(err.EXIT_CODE) def writeMetadata( self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler