diff --git a/python/lsst/pipe/base/_quantumContext.py b/python/lsst/pipe/base/_quantumContext.py index 306d4fb26..c13b35efb 100644 --- a/python/lsst/pipe/base/_quantumContext.py +++ b/python/lsst/pipe/base/_quantumContext.py @@ -348,7 +348,8 @@ def put( ``[calexp1, calexp2]``. Like wise if there is a single ref, then only a single object need be passed. The same restriction applies if dataset is directly a `list` of `~lsst.daf.butler.DatasetRef` - or a single `~lsst.daf.butler.DatasetRef`. + or a single `~lsst.daf.butler.DatasetRef`. If ``values.NAME`` is + None, no output is written. dataset : `OutputQuantizedConnection` or `list`[`DatasetRef`] \ or `DatasetRef` This argument may either be an `InputQuantizedConnection` which @@ -371,7 +372,8 @@ def put( " attributes must be passed as the values to put" ) for name, refs in dataset: - valuesAttribute = getattr(values, name) + if (valuesAttribute := getattr(values, name, None)) is None: + continue if isinstance(refs, list | tuple): if len(refs) != len(valuesAttribute): raise ValueError(f"There must be a object to put for every Dataset ref in {name}") diff --git a/python/lsst/pipe/base/_status.py b/python/lsst/pipe/base/_status.py index caab95564..e74187e3f 100644 --- a/python/lsst/pipe/base/_status.py +++ b/python/lsst/pipe/base/_status.py @@ -27,16 +27,23 @@ from __future__ import annotations +import abc +import logging +from typing import Protocol + +from lsst.utils import introspection + +from ._task_metadata import GetSetDictMetadata, NestedMetadataDict + __all__ = ( + "UnprocessableDataError", + "AnnotatedPartialOutputsError", "NoWorkFound", "RepeatableQuantumError", + "AlgorithmError", "InvalidQuantumError", ) -from typing import Protocol - -from ._task_metadata import GetSetDictMetadata - class GetSetDictMetadataHolder(Protocol): """Protocol for objects that have a ``metadata`` attribute that satisfies @@ -86,6 +93,123 @@ class RepeatableQuantumError(RuntimeError): EXIT_CODE = 20 +class AlgorithmError(RepeatableQuantumError, abc.ABC): + """Exception that may be raised by PipelineTasks (and code they delegate + to) in order to indicate a repeatable algorithmic failure that will not be + addressed by retries. + + Subclass this exception to define the metadata associated with the error + (for example: number of data points in a fit vs. degrees of freedom). + """ + + @property + @abc.abstractmethod + def metadata(self) -> NestedMetadataDict | None: + """Metadata from the raising `~lsst.pipe.base.Task` with more + information about the failure. The contents of the dict are + `~lsst.pipe.base.Task`-dependent, and must have `str` keys and `str`, + `int`, `float`, `bool`, or nested-dictionary (with the same key and + value types) values. + """ + raise NotImplementedError + + +class UnprocessableDataError(RepeatableQuantumError): + """Exception that will be subclassed and raised by Tasks to indicate a + failure to process their inputs for some reason that is non-recoverable. + + Notes + ----- + An example is a known bright star that causes PSF measurement to fail, and + that makes that detector entirely non-recoverable. + + Do not raise this unless we are convinced that the data cannot be + processed, even by a better algorithm. Most instances where this error + would be raised likely require an RFC to explicitly define the situation. + """ + + +class AnnotatedPartialOutputsError(RepeatableQuantumError): + """Exception that runQuantum raises when the (partial) outputs it has + written contain information about their own incompleteness or degraded + quality. + + This exception should always chain the original error. When the + executor catches this exception, it will report the original exception. In + contrast, other exceptions raised from ``runQuantum`` are considered to + invalidate any outputs that are already written. + """ + + @classmethod + def annotate( + cls, error: Exception, *args: GetSetDictMetadataHolder | None, log: logging.Logger + ) -> AnnotatedPartialOutputsError: + """Set metadata on outputs to explain the nature of the failure. + + Parameters + ---------- + error : `Exception` + Exception that caused the task to fail. + *args : `GetSetDictMetadataHolder` + Objects (e.g. Task, Exposure, SimpleCatalog) to annotate with + failure information. They must have a `metadata` property. + log : `logging.Logger` + Log to send error message to. + + Returns + ------- + error : `AnnotatedPartialOutputsError` + Exception that the failing task can ``raise from`` with the + passed-in exception. + + Notes + ----- + This should be called from within an except block that has caught an + exception. Here is an example of handling a failure in + ``PipelineTask.runQuantum`` that annotates and writes partial outputs: + + .. code-block:: py + :name: annotate-error-example + + def runQuantum(self, butlerQC, inputRefs, outputRefs): + inputs = butlerQC.get(inputRefs) + exposures = inputs.pop("exposures") + assert not inputs, "runQuantum got more inputs than expected" + + result = pipeBase.Struct(catalog=None) + try: + self.run(exposure) + except pipeBase.AlgorithmError as e: + error = pipeBase.AnnotatedPartialOutputsError.annotate( + e, self, result.catalog, log=self.log + ) + raise error from e + finally: + butlerQC.put(result, outputRefs) + """ + failure_info = { + "message": str(error), + "type": introspection.get_full_type_name(error), + } + if other := getattr(error, "metadata", None): + failure_info["metadata"] = other + + # NOTE: Can't fully test this in pipe_base because afw is not a + # dependency; test_calibrateImage.py in pipe_tasks gives more coverage. + for item in args: + # Some outputs may not exist, so we cannot set metadata on them. + if item is None: + continue + item.metadata.set_dict("failure", failure_info) # type: ignore + + log.exception( + "Task failed with only partial outputs; see exception message for details.", + exc_info=error, + ) + + return cls("Task failed and wrote partial outputs: see chained exception for details.") + + class InvalidQuantumError(Exception): """Exception that may be raised by PipelineTasks (and code they delegate to) in order to indicate logic bug or configuration problem. diff --git a/python/lsst/pipe/base/task.py b/python/lsst/pipe/base/task.py index 2067eb926..14965b565 100644 --- a/python/lsst/pipe/base/task.py +++ b/python/lsst/pipe/base/task.py @@ -31,6 +31,7 @@ from typing import TYPE_CHECKING, Any, ClassVar import lsst.utils +import lsst.utils.introspection import lsst.utils.logging from lsst.pex.config import ConfigurableField from lsst.utils.timer import logInfo diff --git a/tests/test_pipeTools.py b/tests/test_pipeTools.py index be3274645..a10768440 100644 --- a/tests/test_pipeTools.py +++ b/tests/test_pipeTools.py @@ -122,12 +122,6 @@ def _makePipeline(tasks): class PipelineToolsTestCase(unittest.TestCase): """A test case for pipelineTools.""" - def setUp(self): - pass - - def tearDown(self): - pass - def testIsOrdered(self): """Tests for pipeTools.isPipelineOrdered method.""" pipeline = _makePipeline([("A", "B", "task1"), ("B", "C", "task2")]) diff --git a/tests/test_pipelineIR.py b/tests/test_pipelineIR.py index b9323e4c8..36c321135 100644 --- a/tests/test_pipelineIR.py +++ b/tests/test_pipelineIR.py @@ -44,12 +44,6 @@ class ConfigIRTestCase(unittest.TestCase): so it should be tested here. """ - def setUp(self): - pass - - def tearDown(self): - pass - def testMergeConfig(self): # Create some configs to merge config1 = ConfigIR( @@ -84,12 +78,6 @@ def testMergeConfig(self): class PipelineIRTestCase(unittest.TestCase): """A test case for PipelineIR objects.""" - def setUp(self): - pass - - def tearDown(self): - pass - def testPipelineIRInitChecks(self): # Missing description pipeline_str = """ diff --git a/tests/test_task.py b/tests/test_task.py index bf246d2f0..3314864cf 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -139,12 +139,6 @@ def run(self, val): class TaskTestCase(unittest.TestCase): """A test case for Task.""" - def setUp(self): - self.valDict = dict() - - def tearDown(self): - self.valDict = None - def testBasics(self): """Test basic construction and use of a task.""" for addend in (1.1, -3.5): @@ -325,6 +319,41 @@ def testTimeMethod(self): new_meta = yaml.safe_load(y) self.assertEqual(new_meta, addMultTask.metadata) + def test_annotate_exception(self): + """Test annotating failures in the task metadata when a non-Task + exception is raised (when there is no `metadata` on the exception). + """ + task = AddMultTask() + msg = "something failed!" + error = ValueError(msg) + with self.assertLogs("addMult", level="ERROR") as cm: + pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log) + self.assertIn(msg, "\n".join(cm.output)) + self.assertEqual(task.metadata["failure"]["message"], msg) + self.assertEqual(task.metadata["failure"]["type"], "ValueError") + self.assertNotIn("metadata", task.metadata["failure"]) + + def test_annotate_task_exception(self): + """Test annotating failures in the task metadata when a Task-specific + exception is raised. + """ + + class TestError(pipeBase.AlgorithmError): + @property + def metadata(self): + return {"something": 12345} + + task = AddMultTask() + msg = "something failed!" + error = TestError(msg) + with self.assertLogs("addMult", level="ERROR") as cm: + pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log) + self.assertIn(msg, "\n".join(cm.output)) + self.assertEqual(task.metadata["failure"]["message"], msg) + result = "test_task.TaskTestCase.test_annotate_task_exception..TestError" + self.assertEqual(task.metadata["failure"]["type"], result) + self.assertEqual(task.metadata["failure"]["metadata"]["something"], 12345) + class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): """Run file leak tests."""