Skip to content

Commit

Permalink
Add new Task exceptions and error annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
parejkoj committed Mar 19, 2024
1 parent a649ff2 commit e720d5e
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 4 deletions.
132 changes: 128 additions & 4 deletions python/lsst/pipe/base/_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@

from __future__ import annotations

import abc
import logging
from typing import Protocol

from lsst.utils import introspection

from ._task_metadata import GetSetDictMetadata, NestedMetadataDict

__all__ = (
"UnprocessableDataError",
"AnnotatedPartialOutputsError",
"NoWorkFound",
"RepeatableQuantumError",
"AlgorithmError",
"InvalidQuantumError",
)

from typing import Protocol

from ._task_metadata import GetSetDictMetadata


class GetSetDictMetadataHolder(Protocol):
"""Protocol for objects that have a ``metadata`` attribute that satisfies
Expand Down Expand Up @@ -86,6 +93,123 @@ class RepeatableQuantumError(RuntimeError):
EXIT_CODE = 20


class AlgorithmError(RepeatableQuantumError, abc.ABC):
"""Exception that may be raised by PipelineTasks (and code they delegate
to) in order to indicate a repeatable algorithmic failure that will not be
addressed by retries.
Subclass this exception to define the metadata associated with the error
(for example: number of data points in a fit vs. degrees of freedom).
"""

@property
@abc.abstractmethod
def metadata(self) -> NestedMetadataDict | None:
"""Metadata from the raising `~lsst.pipe.base.Task` with more
information about the failure. The contents of the dict are
`~lsst.pipe.base.Task`-dependent, and must have `str` keys and `str`,
`int`, `float`, `bool`, or nested-dictionary (with the same key and
value types) values.
"""
raise NotImplementedError


class UnprocessableDataError(RepeatableQuantumError):
"""Exception that will be subclassed and raised by Tasks to indicate a
failure to process their inputs for some reason that is non-recoverable.
Notes
-----
An example is a known bright star that causes PSF measurement to fail, and
that makes that detector entirely non-recoverable.
Do not raise this unless we are convinced that the data cannot be
processed, even by a better algorithm. Most instances where this error
would be raised likely require an RFC to explicitly define the situation.
"""


class AnnotatedPartialOutputsError(RepeatableQuantumError):
"""Exception that runQuantum raises when the (partial) outputs it has
written contain information about their own incompleteness or degraded
quality.
This exception should always chain the original error. When the
executor catches this exception, it will report the original exception. In
contrast, other exceptions raised from ``runQuantum`` are considered to
invalidate any outputs that are already written.
"""

@classmethod
def annotate(
cls, error: Exception, *args: GetSetDictMetadataHolder | None, log: logging.Logger
) -> AnnotatedPartialOutputsError:
"""Set metadata on outputs to explain the nature of the failure.
Parameters
----------
error : `Exception`
Exception that caused the task to fail.
*args : `GetSetDictMetadataHolder`
Objects (e.g. Task, Exposure, SimpleCatalog) to annotate with
failure information. They must have a `metadata` property.
log : `logging.Logger`
Log to send error message to.
Returns
-------
error : `AnnotatedPartialOutputsError`
Exception that the failing task can ``raise from`` with the
passed-in exception.
Notes
-----
This should be called from within an except block that has caught an
exception. Here is an example of handling a failure in
``PipelineTask.runQuantum`` that annotates and writes partial outputs:
.. code-block:: py
:name: annotate-error-example
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
exposures = inputs.pop("exposures")
assert not inputs, "runQuantum got more inputs than expected"
result = pipeBase.Struct(catalog=None)
try:
self.run(exposure)
except pipeBase.AlgorithmError as e:
error = pipeBase.AnnotatedPartialOutputsError.annotate(
e, self, result.catalog, log=self.log
)
raise error from e
finally:
butlerQC.put(result, outputRefs)
"""
failure_info = {
"message": str(error),
"type": introspection.get_full_type_name(error),
}
if other := getattr(error, "metadata", None):
failure_info["metadata"] = other

# NOTE: Can't fully test this in pipe_base because afw is not a
# dependency; test_calibrateImage.py in pipe_tasks gives more coverage.
for item in args:
# Some outputs may not exist, so we cannot set metadata on them.
if item is None:
continue
item.metadata.set_dict("failure", failure_info) # type: ignore

log.exception(
"Task failed with only partial outputs; see exception message for details.",
exc_info=error,
)

return cls("Task failed and wrote partial outputs: see chained exception for details.")


class InvalidQuantumError(Exception):
"""Exception that may be raised by PipelineTasks (and code they delegate
to) in order to indicate logic bug or configuration problem.
Expand Down
1 change: 1 addition & 0 deletions python/lsst/pipe/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from typing import TYPE_CHECKING, Any, ClassVar

import lsst.utils
import lsst.utils.introspection
import lsst.utils.logging
from lsst.pex.config import ConfigurableField
from lsst.utils.timer import logInfo
Expand Down
35 changes: 35 additions & 0 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,41 @@ def testTimeMethod(self):
new_meta = yaml.safe_load(y)
self.assertEqual(new_meta, addMultTask.metadata)

def test_annotate_exception(self):
"""Test annotating failures in the task metadata when a non-Task
exception is raised (when there is no `metadata` on the exception).
"""
task = AddMultTask()
msg = "something failed!"
error = ValueError(msg)
with self.assertLogs("addMult", level="ERROR") as cm:
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure"]["message"], msg)
self.assertEqual(task.metadata["failure"]["type"], "ValueError")
self.assertNotIn("metadata", task.metadata["failure"])

def test_annotate_task_exception(self):
"""Test annotating failures in the task metadata when a Task-specific
exception is raised.
"""

class TestError(pipeBase.AlgorithmError):
@property
def metadata(self):
return {"something": 12345}

task = AddMultTask()
msg = "something failed!"
error = TestError(msg)
with self.assertLogs("addMult", level="ERROR") as cm:
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure"]["message"], msg)
result = "test_task.TaskTestCase.test_annotate_task_exception.<locals>.TestError"
self.assertEqual(task.metadata["failure"]["type"], result)
self.assertEqual(task.metadata["failure"]["metadata"]["something"], 12345)


class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
"""Run file leak tests."""
Expand Down

0 comments on commit e720d5e

Please sign in to comment.