diff --git a/python/lsst/pipe/base/_status.py b/python/lsst/pipe/base/_status.py index caab95564..e74187e3f 100644 --- a/python/lsst/pipe/base/_status.py +++ b/python/lsst/pipe/base/_status.py @@ -27,16 +27,23 @@ from __future__ import annotations +import abc +import logging +from typing import Protocol + +from lsst.utils import introspection + +from ._task_metadata import GetSetDictMetadata, NestedMetadataDict + __all__ = ( + "UnprocessableDataError", + "AnnotatedPartialOutputsError", "NoWorkFound", "RepeatableQuantumError", + "AlgorithmError", "InvalidQuantumError", ) -from typing import Protocol - -from ._task_metadata import GetSetDictMetadata - class GetSetDictMetadataHolder(Protocol): """Protocol for objects that have a ``metadata`` attribute that satisfies @@ -86,6 +93,123 @@ class RepeatableQuantumError(RuntimeError): EXIT_CODE = 20 +class AlgorithmError(RepeatableQuantumError, abc.ABC): + """Exception that may be raised by PipelineTasks (and code they delegate + to) in order to indicate a repeatable algorithmic failure that will not be + addressed by retries. + + Subclass this exception to define the metadata associated with the error + (for example: number of data points in a fit vs. degrees of freedom). + """ + + @property + @abc.abstractmethod + def metadata(self) -> NestedMetadataDict | None: + """Metadata from the raising `~lsst.pipe.base.Task` with more + information about the failure. The contents of the dict are + `~lsst.pipe.base.Task`-dependent, and must have `str` keys and `str`, + `int`, `float`, `bool`, or nested-dictionary (with the same key and + value types) values. + """ + raise NotImplementedError + + +class UnprocessableDataError(RepeatableQuantumError): + """Exception that will be subclassed and raised by Tasks to indicate a + failure to process their inputs for some reason that is non-recoverable. + + Notes + ----- + An example is a known bright star that causes PSF measurement to fail, and + that makes that detector entirely non-recoverable. + + Do not raise this unless we are convinced that the data cannot be + processed, even by a better algorithm. Most instances where this error + would be raised likely require an RFC to explicitly define the situation. + """ + + +class AnnotatedPartialOutputsError(RepeatableQuantumError): + """Exception that runQuantum raises when the (partial) outputs it has + written contain information about their own incompleteness or degraded + quality. + + This exception should always chain the original error. When the + executor catches this exception, it will report the original exception. In + contrast, other exceptions raised from ``runQuantum`` are considered to + invalidate any outputs that are already written. + """ + + @classmethod + def annotate( + cls, error: Exception, *args: GetSetDictMetadataHolder | None, log: logging.Logger + ) -> AnnotatedPartialOutputsError: + """Set metadata on outputs to explain the nature of the failure. + + Parameters + ---------- + error : `Exception` + Exception that caused the task to fail. + *args : `GetSetDictMetadataHolder` + Objects (e.g. Task, Exposure, SimpleCatalog) to annotate with + failure information. They must have a `metadata` property. + log : `logging.Logger` + Log to send error message to. + + Returns + ------- + error : `AnnotatedPartialOutputsError` + Exception that the failing task can ``raise from`` with the + passed-in exception. + + Notes + ----- + This should be called from within an except block that has caught an + exception. Here is an example of handling a failure in + ``PipelineTask.runQuantum`` that annotates and writes partial outputs: + + .. code-block:: py + :name: annotate-error-example + + def runQuantum(self, butlerQC, inputRefs, outputRefs): + inputs = butlerQC.get(inputRefs) + exposures = inputs.pop("exposures") + assert not inputs, "runQuantum got more inputs than expected" + + result = pipeBase.Struct(catalog=None) + try: + self.run(exposure) + except pipeBase.AlgorithmError as e: + error = pipeBase.AnnotatedPartialOutputsError.annotate( + e, self, result.catalog, log=self.log + ) + raise error from e + finally: + butlerQC.put(result, outputRefs) + """ + failure_info = { + "message": str(error), + "type": introspection.get_full_type_name(error), + } + if other := getattr(error, "metadata", None): + failure_info["metadata"] = other + + # NOTE: Can't fully test this in pipe_base because afw is not a + # dependency; test_calibrateImage.py in pipe_tasks gives more coverage. + for item in args: + # Some outputs may not exist, so we cannot set metadata on them. + if item is None: + continue + item.metadata.set_dict("failure", failure_info) # type: ignore + + log.exception( + "Task failed with only partial outputs; see exception message for details.", + exc_info=error, + ) + + return cls("Task failed and wrote partial outputs: see chained exception for details.") + + class InvalidQuantumError(Exception): """Exception that may be raised by PipelineTasks (and code they delegate to) in order to indicate logic bug or configuration problem. diff --git a/python/lsst/pipe/base/task.py b/python/lsst/pipe/base/task.py index 2067eb926..14965b565 100644 --- a/python/lsst/pipe/base/task.py +++ b/python/lsst/pipe/base/task.py @@ -31,6 +31,7 @@ from typing import TYPE_CHECKING, Any, ClassVar import lsst.utils +import lsst.utils.introspection import lsst.utils.logging from lsst.pex.config import ConfigurableField from lsst.utils.timer import logInfo diff --git a/tests/test_task.py b/tests/test_task.py index bf246d2f0..e2930415a 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -325,6 +325,41 @@ def testTimeMethod(self): new_meta = yaml.safe_load(y) self.assertEqual(new_meta, addMultTask.metadata) + def test_annotate_exception(self): + """Test annotating failures in the task metadata when a non-Task + exception is raised (when there is no `metadata` on the exception). + """ + task = AddMultTask() + msg = "something failed!" + error = ValueError(msg) + with self.assertLogs("addMult", level="ERROR") as cm: + pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log) + self.assertIn(msg, "\n".join(cm.output)) + self.assertEqual(task.metadata["failure"]["message"], msg) + self.assertEqual(task.metadata["failure"]["type"], "ValueError") + self.assertNotIn("metadata", task.metadata["failure"]) + + def test_annotate_task_exception(self): + """Test annotating failures in the task metadata when a Task-specific + exception is raised. + """ + + class TestError(pipeBase.AlgorithmError): + @property + def metadata(self): + return {"something": 12345} + + task = AddMultTask() + msg = "something failed!" + error = TestError(msg) + with self.assertLogs("addMult", level="ERROR") as cm: + pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log) + self.assertIn(msg, "\n".join(cm.output)) + self.assertEqual(task.metadata["failure"]["message"], msg) + result = "test_task.TaskTestCase.test_annotate_task_exception..TestError" + self.assertEqual(task.metadata["failure"]["type"], result) + self.assertEqual(task.metadata["failure"]["metadata"]["something"], 12345) + class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): """Run file leak tests."""