Skip to content

Commit

Permalink
Add new Task exceptions and error annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
parejkoj committed Feb 9, 2024
1 parent a8697e8 commit d24620a
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 2 deletions.
74 changes: 72 additions & 2 deletions python/lsst/pipe/base/_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@

from __future__ import annotations

import abc
import itertools

from lsst.daf.base import PropertyList
from lsst.utils import introspection

__all__ = (
"UnprocessableDataError",
"AnnotatedPartialOutputsError",
"NoWorkFound",
"RepeatableQuantumError",
"InvalidQuantumError",
Expand All @@ -47,8 +55,8 @@ class NoWorkFound(BaseException):
logic to trap it.
"""


class RepeatableQuantumError(RuntimeError):
# TODO: can we make this a dataclass?
class RepeatableQuantumError(RuntimeError, abc.ABC):
"""Exception that may be raised by PipelineTasks (and code they delegate
to) in order to indicate that a repeatable problem that will not be
addressed by retries.
Expand All @@ -73,6 +81,68 @@ class RepeatableQuantumError(RuntimeError):

EXIT_CODE = 20

@property
@abc.abstractmethod
def metadata(self):
"""
Metadata from the raising `~lsst.pipe.base.Task` with more information
about the failure. The contents of the dict are `~lsst.pipe.base.Task`
-dependent.
"""
pass


class UnprocessableDataError(RepeatableQuantumError):
"""Exception that will be subclassed and raised by Tasks to indicate a
failure to process their inputs for some reason that is non-recoverable.
Notes
-----
An example is a known bright star that causes PSF measurement to fail, and
that makes that detector entirely non-recoverable.
Do not raise this unless we are convinced that the data cannot be
processed, even by a better algorithm.
"""


class AnnotatedPartialOutputsError(RepeatableQuantumError):
"""Exception that runQuantum raises when the (partial) outputs it has
written contain information about their own incompleteness or degraded
quality.
This exception should always chain the original error. When the
executor catches this exception, it will report the original exception. In
contrast, other exceptions raised from ``runQuantum`` are considered to
invalidate any outputs that are already written.
This allows a failure to be "softened" in ``run``, but still raised in
``runQuantum``, to allow partial outputs to still be written.
"""
@classmethod
def annotate(cls, exception: Exception, *,
task, exposures=(), catalogs=()) -> None:
"""Set metadata on outputs to explain the nature of the failure.
Notes
-----
This can only be called from within an except block that has caught
the given exception.
"""
# NOTE: can't test this in pipe_base because afw is not a dependency!
import os; print(os.getpid()); import ipdb; ipdb.set_trace();
for item in itertools.chain(exposures, catalogs):
if item is None:
continue
metadata = item.getMetadata()
metadata.add("failure_message", str(exception))
metadata.add("failure_type", introspection.get_full_type_name(exception))
if other := getattr(exception, "metadata", None):
metadata.add("failure_metadata", PropertyList.from_mapping(other))

task._annotate_failure(exception)

return cls("Task completed with only partial outputs: see chained exception for details.")


class InvalidQuantumError(Exception):
"""Exception that may be raised by PipelineTasks (and code they delegate
Expand Down
12 changes: 12 additions & 0 deletions python/lsst/pipe/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
from typing import TYPE_CHECKING, Any, ClassVar

import lsst.utils
import lsst.utils.introspection
import lsst.utils.logging
from lsst.pex.config import ConfigurableField
from lsst.utils import introspection
from lsst.utils.timer import logInfo

if TYPE_CHECKING:
Expand Down Expand Up @@ -370,6 +372,16 @@ class OtherTaskConfig(lsst.pex.config.Config):
"""
return ConfigurableField(doc=doc, target=cls)

def _annotate_failure(self, exception):
"""Annotate this task's metadata with information about an exception
that was raised during the execution of the task.
"""
self.metadata["failure_message"] = str(exception)
self.metadata["failure_type"] = introspection.get_full_type_name(exception)
if metadata := getattr(exception, "metadata", None):
self.metadata["failure_metadata"] = metadata

self.log.warning(str(exception))
def _computeFullName(self, name: str) -> str:
"""Compute the full name of a subtask or metadata item, given its brief
name.
Expand Down
47 changes: 47 additions & 0 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,53 @@ def testTimeMethod(self):
new_meta = yaml.safe_load(y)
self.assertEqual(new_meta, addMultTask.metadata)

def test_annotate_failure_generic_exception(self):
"""Test annotating failures in the task metadata when a non-Task
exception is raised.
"""
task = AddMultTask()
msg = "something failed!"
error = ValueError(msg)
with self.assertLogs("addMult", level="WARNING") as cm:
task._annotate_failure(error)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure_message"], msg)
self.assertEqual(task.metadata["failure_type"], "ValueError")
self.assertNotIn("failure_metadata", task.metadata)

def test_annotate_failure_task_exception(self):
"""Test annotating failures in the task metadata when a Task-specific
exception is raised.
"""
class TestError(pipeBase.RepeatableQuantumError):
@property
def metadata(self):
return {"something": 12345}

task = AddMultTask()
msg = "something failed!"
error = TestError(msg)
with self.assertLogs("addMult", level="WARNING") as cm:
task._annotate_failure(error)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure_message"], msg)
result = "test_task.TaskTestCase.test_annotate_failure_task_exception.<locals>.TestError"
self.assertEqual(task.metadata["failure_type"], result)
# TODO: can we make this work without it being in "`scalars`?
self.assertEqual(task.metadata["failure_metadata"].scalars, {"something": 12345})

# NOTE: can't test this in pipe_base because afw is not a dependency!
# def testAnnotatedPartialOutputsError(self):
# """Test that AnnotatedPartialOutputsError adds appropriate metadata to
# the task and datatypes.
# """
# task = AddMultTask()
# exposure =
# error = ValueError("some failure")
# result = pipeBase.AnnotatedPartialOutputsError.annotate(error,
# task=task,
# )


class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
"""Run file leak tests."""
Expand Down

0 comments on commit d24620a

Please sign in to comment.