From d24620a84a02c4fbcbfd976f7667d86802333017 Mon Sep 17 00:00:00 2001 From: John Parejko Date: Thu, 25 Jan 2024 16:50:43 -0800 Subject: [PATCH] Add new Task exceptions and error annotation --- python/lsst/pipe/base/_status.py | 74 +++++++++++++++++++++++++++++++- python/lsst/pipe/base/task.py | 12 ++++++ tests/test_task.py | 47 ++++++++++++++++++++ 3 files changed, 131 insertions(+), 2 deletions(-) diff --git a/python/lsst/pipe/base/_status.py b/python/lsst/pipe/base/_status.py index 11a41cfb5..164ce92d4 100644 --- a/python/lsst/pipe/base/_status.py +++ b/python/lsst/pipe/base/_status.py @@ -27,7 +27,15 @@ from __future__ import annotations +import abc +import itertools + +from lsst.daf.base import PropertyList +from lsst.utils import introspection + __all__ = ( + "UnprocessableDataError", + "AnnotatedPartialOutputsError", "NoWorkFound", "RepeatableQuantumError", "InvalidQuantumError", @@ -47,8 +55,8 @@ class NoWorkFound(BaseException): logic to trap it. """ - -class RepeatableQuantumError(RuntimeError): +# TODO: can we make this a dataclass? +class RepeatableQuantumError(RuntimeError, abc.ABC): """Exception that may be raised by PipelineTasks (and code they delegate to) in order to indicate that a repeatable problem that will not be addressed by retries. @@ -73,6 +81,68 @@ class RepeatableQuantumError(RuntimeError): EXIT_CODE = 20 + @property + @abc.abstractmethod + def metadata(self): + """ + Metadata from the raising `~lsst.pipe.base.Task` with more information + about the failure. The contents of the dict are `~lsst.pipe.base.Task` + -dependent. + """ + pass + + +class UnprocessableDataError(RepeatableQuantumError): + """Exception that will be subclassed and raised by Tasks to indicate a + failure to process their inputs for some reason that is non-recoverable. + + Notes + ----- + An example is a known bright star that causes PSF measurement to fail, and + that makes that detector entirely non-recoverable. + + Do not raise this unless we are convinced that the data cannot be + processed, even by a better algorithm. + """ + + +class AnnotatedPartialOutputsError(RepeatableQuantumError): + """Exception that runQuantum raises when the (partial) outputs it has + written contain information about their own incompleteness or degraded + quality. + + This exception should always chain the original error. When the + executor catches this exception, it will report the original exception. In + contrast, other exceptions raised from ``runQuantum`` are considered to + invalidate any outputs that are already written. + This allows a failure to be "softened" in ``run``, but still raised in + ``runQuantum``, to allow partial outputs to still be written. + """ + @classmethod + def annotate(cls, exception: Exception, *, + task, exposures=(), catalogs=()) -> None: + """Set metadata on outputs to explain the nature of the failure. + + Notes + ----- + This can only be called from within an except block that has caught + the given exception. + """ + # NOTE: can't test this in pipe_base because afw is not a dependency! + import os; print(os.getpid()); import ipdb; ipdb.set_trace(); + for item in itertools.chain(exposures, catalogs): + if item is None: + continue + metadata = item.getMetadata() + metadata.add("failure_message", str(exception)) + metadata.add("failure_type", introspection.get_full_type_name(exception)) + if other := getattr(exception, "metadata", None): + metadata.add("failure_metadata", PropertyList.from_mapping(other)) + + task._annotate_failure(exception) + + return cls("Task completed with only partial outputs: see chained exception for details.") + class InvalidQuantumError(Exception): """Exception that may be raised by PipelineTasks (and code they delegate diff --git a/python/lsst/pipe/base/task.py b/python/lsst/pipe/base/task.py index 2067eb926..05c84aeba 100644 --- a/python/lsst/pipe/base/task.py +++ b/python/lsst/pipe/base/task.py @@ -31,8 +31,10 @@ from typing import TYPE_CHECKING, Any, ClassVar import lsst.utils +import lsst.utils.introspection import lsst.utils.logging from lsst.pex.config import ConfigurableField +from lsst.utils import introspection from lsst.utils.timer import logInfo if TYPE_CHECKING: @@ -370,6 +372,16 @@ class OtherTaskConfig(lsst.pex.config.Config): """ return ConfigurableField(doc=doc, target=cls) + def _annotate_failure(self, exception): + """Annotate this task's metadata with information about an exception + that was raised during the execution of the task. + """ + self.metadata["failure_message"] = str(exception) + self.metadata["failure_type"] = introspection.get_full_type_name(exception) + if metadata := getattr(exception, "metadata", None): + self.metadata["failure_metadata"] = metadata + + self.log.warning(str(exception)) def _computeFullName(self, name: str) -> str: """Compute the full name of a subtask or metadata item, given its brief name. diff --git a/tests/test_task.py b/tests/test_task.py index bf246d2f0..5f9db4ca2 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -325,6 +325,53 @@ def testTimeMethod(self): new_meta = yaml.safe_load(y) self.assertEqual(new_meta, addMultTask.metadata) + def test_annotate_failure_generic_exception(self): + """Test annotating failures in the task metadata when a non-Task + exception is raised. + """ + task = AddMultTask() + msg = "something failed!" + error = ValueError(msg) + with self.assertLogs("addMult", level="WARNING") as cm: + task._annotate_failure(error) + self.assertIn(msg, "\n".join(cm.output)) + self.assertEqual(task.metadata["failure_message"], msg) + self.assertEqual(task.metadata["failure_type"], "ValueError") + self.assertNotIn("failure_metadata", task.metadata) + + def test_annotate_failure_task_exception(self): + """Test annotating failures in the task metadata when a Task-specific + exception is raised. + """ + class TestError(pipeBase.RepeatableQuantumError): + @property + def metadata(self): + return {"something": 12345} + + task = AddMultTask() + msg = "something failed!" + error = TestError(msg) + with self.assertLogs("addMult", level="WARNING") as cm: + task._annotate_failure(error) + self.assertIn(msg, "\n".join(cm.output)) + self.assertEqual(task.metadata["failure_message"], msg) + result = "test_task.TaskTestCase.test_annotate_failure_task_exception..TestError" + self.assertEqual(task.metadata["failure_type"], result) + # TODO: can we make this work without it being in "`scalars`? + self.assertEqual(task.metadata["failure_metadata"].scalars, {"something": 12345}) + + # NOTE: can't test this in pipe_base because afw is not a dependency! + # def testAnnotatedPartialOutputsError(self): + # """Test that AnnotatedPartialOutputsError adds appropriate metadata to + # the task and datatypes. + # """ + # task = AddMultTask() + # exposure = + # error = ValueError("some failure") + # result = pipeBase.AnnotatedPartialOutputsError.annotate(error, + # task=task, + # ) + class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): """Run file leak tests."""