From 73295c3325f342687010a4a5d02ee78d9532ece6 Mon Sep 17 00:00:00 2001 From: John Parejko Date: Fri, 23 Feb 2024 00:16:08 -0800 Subject: [PATCH] New version of task exceptions and annotation --- python/lsst/pipe/base/_status.py | 71 ++++++++++++++++---------------- python/lsst/pipe/base/task.py | 12 ------ tests/test_task.py | 24 +++++------ 3 files changed, 47 insertions(+), 60 deletions(-) diff --git a/python/lsst/pipe/base/_status.py b/python/lsst/pipe/base/_status.py index 751813e69..63059be7e 100644 --- a/python/lsst/pipe/base/_status.py +++ b/python/lsst/pipe/base/_status.py @@ -26,13 +26,15 @@ # along with this program. If not, see . from __future__ import annotations +from typing import Protocol import abc -import itertools +import logging -from lsst.daf.base import PropertyList from lsst.utils import introspection +from ._task_metadata import GetSetDictMetadata, NestedMetadataDict + __all__ = ( "UnprocessableDataError", "AnnotatedPartialOutputsError", @@ -41,10 +43,6 @@ "InvalidQuantumError", ) -from typing import Protocol - -from ._task_metadata import GetSetDictMetadata - class GetSetDictMetadataHolder(Protocol): """Protocol for objects that have a ``metadata`` attribute that satisfies @@ -95,13 +93,14 @@ class RepeatableQuantumError(RuntimeError, abc.ABC): @property @abc.abstractmethod - def metadata(self) -> None: - """ - Metadata from the raising `~lsst.pipe.base.Task` with more information - about the failure. The contents of the dict are `~lsst.pipe.base.Task` - -dependent. + def metadata(self) -> NestedMetadataDict | None: + """Metadata from the raising `~lsst.pipe.base.Task` with more + information about the failure. The contents of the dict are + `~lsst.pipe.base.Task`-dependent, and must have `str` keys and `str`, + `int`, `float`, `bool`, or nested-dictionary (with the same key and + value types) values. """ - pass + raise NotImplementedError class UnprocessableDataError(RepeatableQuantumError): @@ -130,19 +129,19 @@ class AnnotatedPartialOutputsError(RepeatableQuantumError): """ @classmethod - def annotate(cls, exception: Exception, *, task, exposures=(), catalogs=()) -> None: + def annotate(cls, exception: Exception, *args: GetSetDictMetadataHolder, + log: logging.Logger = None) -> AnnotatedPartialOutputsError: """Set metadata on outputs to explain the nature of the failure. Parameters ---------- exception : `Exception` Exception that caused the task to fail. - task : `lsst.pipe.base.Task` - Task that failed and that will be annotated. - exposures : `tuple` [`lsst.afw.image.Exposure`], optional - Exposures to annotate with information about the failure. - catalogs : `tuple` [`lsst.afw.table.Catalog`], optional - Catalogs to annotate with information about the failure. + *args : `GetSetDictMetadataHolder` + Objects (e.g. Task, Exposure, SimpleCatalog) to annotate with + failure information. They must have a `metadata` property. + log : `logging.Logger`, optional + Log to send error message to. Returns ------- @@ -154,8 +153,7 @@ def annotate(cls, exception: Exception, *, task, exposures=(), catalogs=()) -> N ----- This should be called from within an except block that has caught an exception. Here is an example of handling a failure in - ``PipelineTask.runQuantum`` that annotates and writes partial outputs - outputs: + ``PipelineTask.runQuantum`` that annotates and writes partial outputs: .. code-block:: py :name: annotate-error-example @@ -170,30 +168,31 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): self.run(exposure) except pipeBase.RepeatableQuantumError as e: error = pipeBase.AnnotatedPartialOutputsError.annotate( - e, - catalogs=[result.catalog], - task=self) + e, self, result.catalog, log=self.log + ) raise error from e finally: butlerQC.put(result, outputRefs) """ - # NOTE: Can't test this in pipe_base because afw is not a dependency; - # test_calibrateImage.py in pipe_tasks provides significant coverage. - for item in itertools.chain(exposures, catalogs): + failure_info = { + "message": str(exception), + "type": introspection.get_full_type_name(exception), + } + if other := getattr(exception, "metadata", None): + failure_info["metadata"] = other + + # NOTE: Can't fully test this in pipe_base because afw is not a + # dependency; test_calibrateImage.py in pipe_tasks gives more coverage. + for item in args: # Some outputs may not exist, so we cannot set metadata on them. if item is None: continue - if (metadata := item.getMetadata()) is None: - metadata = PropertyList() - item.setMetadata(metadata) - metadata.add("failure_message", str(exception)) - metadata.add("failure_type", introspection.get_full_type_name(exception)) - if other := getattr(exception, "metadata", None): - metadata.add("failure_metadata", PropertyList.from_mapping(other)) + item.metadata.set_dict("failure", failure_info) - task.annotate_failure(exception) + if log: + log.error("Task failed with only partial outputs. %s: %s", exception, str(exception)) - return cls("Task completed with only partial outputs: see chained exception for details.") + return cls("Task failed and wrote partial outputs: see chained exception for details.") class InvalidQuantumError(Exception): diff --git a/python/lsst/pipe/base/task.py b/python/lsst/pipe/base/task.py index ce433c220..14965b565 100644 --- a/python/lsst/pipe/base/task.py +++ b/python/lsst/pipe/base/task.py @@ -34,7 +34,6 @@ import lsst.utils.introspection import lsst.utils.logging from lsst.pex.config import ConfigurableField -from lsst.utils import introspection from lsst.utils.timer import logInfo if TYPE_CHECKING: @@ -372,17 +371,6 @@ class OtherTaskConfig(lsst.pex.config.Config): """ return ConfigurableField(doc=doc, target=cls) - def annotate_failure(self, exception): - """Annotate this task's metadata with information about an exception - that was raised during the execution of the task. - """ - self.metadata["failure_message"] = str(exception) - self.metadata["failure_type"] = introspection.get_full_type_name(exception) - if metadata := getattr(exception, "metadata", None): - self.metadata["failure_metadata"] = metadata - - self.log.warning(str(exception)) - def _computeFullName(self, name: str) -> str: """Compute the full name of a subtask or metadata item, given its brief name. diff --git a/tests/test_task.py b/tests/test_task.py index bf30ef343..7fe262ec3 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -319,21 +319,21 @@ def testTimeMethod(self): new_meta = yaml.safe_load(y) self.assertEqual(new_meta, addMultTask.metadata) - def test_annotate_failure_generic_exception(self): + def test_annotate_exception(self): """Test annotating failures in the task metadata when a non-Task - exception is raised. + exception is raised (when there is no `metadata` on the exception). """ task = AddMultTask() msg = "something failed!" error = ValueError(msg) with self.assertLogs("addMult", level="WARNING") as cm: - task.annotate_failure(error) + pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log) self.assertIn(msg, "\n".join(cm.output)) - self.assertEqual(task.metadata["failure_message"], msg) - self.assertEqual(task.metadata["failure_type"], "ValueError") - self.assertNotIn("failure_metadata", task.metadata) + self.assertEqual(task.metadata["failure"]["message"], msg) + self.assertEqual(task.metadata["failure"]["type"], "ValueError") + self.assertNotIn("metadata", task.metadata["failure"]) - def test_annotate_failure_task_exception(self): + def test_annotate_task_exception(self): """Test annotating failures in the task metadata when a Task-specific exception is raised. """ @@ -347,12 +347,12 @@ def metadata(self): msg = "something failed!" error = TestError(msg) with self.assertLogs("addMult", level="WARNING") as cm: - task.annotate_failure(error) + pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log) self.assertIn(msg, "\n".join(cm.output)) - self.assertEqual(task.metadata["failure_message"], msg) - result = "test_task.TaskTestCase.test_annotate_failure_task_exception..TestError" - self.assertEqual(task.metadata["failure_type"], result) - self.assertEqual(task.metadata["failure_metadata"]["something"], 12345) + self.assertEqual(task.metadata["failure"]["message"], msg) + result = "test_task.TaskTestCase.test_annotate_task_exception..TestError" + self.assertEqual(task.metadata["failure"]["type"], result) + self.assertEqual(task.metadata["failure"]["metadata"]["something"], 12345) class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):