Skip to content

Commit

Permalink
Merge pull request #400 from lsst/tickets/DM-39842
Browse files Browse the repository at this point in the history
DM-39842: Add new Task exceptions and error annotation
  • Loading branch information
parejkoj authored Mar 22, 2024
2 parents 02714a3 + eb16be8 commit 7c47f2a
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 30 deletions.
6 changes: 4 additions & 2 deletions python/lsst/pipe/base/_quantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ def put(
``[calexp1, calexp2]``. Like wise if there is a single ref, then
only a single object need be passed. The same restriction applies
if dataset is directly a `list` of `~lsst.daf.butler.DatasetRef`
or a single `~lsst.daf.butler.DatasetRef`.
or a single `~lsst.daf.butler.DatasetRef`. If ``values.NAME`` is
None, no output is written.
dataset : `OutputQuantizedConnection` or `list`[`DatasetRef`] \
or `DatasetRef`
This argument may either be an `InputQuantizedConnection` which
Expand All @@ -371,7 +372,8 @@ def put(
" attributes must be passed as the values to put"
)
for name, refs in dataset:
valuesAttribute = getattr(values, name)
if (valuesAttribute := getattr(values, name, None)) is None:
continue
if isinstance(refs, list | tuple):
if len(refs) != len(valuesAttribute):
raise ValueError(f"There must be a object to put for every Dataset ref in {name}")
Expand Down
132 changes: 128 additions & 4 deletions python/lsst/pipe/base/_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@

from __future__ import annotations

import abc
import logging
from typing import Protocol

from lsst.utils import introspection

from ._task_metadata import GetSetDictMetadata, NestedMetadataDict

__all__ = (
"UnprocessableDataError",
"AnnotatedPartialOutputsError",
"NoWorkFound",
"RepeatableQuantumError",
"AlgorithmError",
"InvalidQuantumError",
)

from typing import Protocol

from ._task_metadata import GetSetDictMetadata


class GetSetDictMetadataHolder(Protocol):
"""Protocol for objects that have a ``metadata`` attribute that satisfies
Expand Down Expand Up @@ -86,6 +93,123 @@ class RepeatableQuantumError(RuntimeError):
EXIT_CODE = 20


class AlgorithmError(RepeatableQuantumError, abc.ABC):
"""Exception that may be raised by PipelineTasks (and code they delegate
to) in order to indicate a repeatable algorithmic failure that will not be
addressed by retries.
Subclass this exception to define the metadata associated with the error
(for example: number of data points in a fit vs. degrees of freedom).
"""

@property
@abc.abstractmethod
def metadata(self) -> NestedMetadataDict | None:
"""Metadata from the raising `~lsst.pipe.base.Task` with more
information about the failure. The contents of the dict are
`~lsst.pipe.base.Task`-dependent, and must have `str` keys and `str`,
`int`, `float`, `bool`, or nested-dictionary (with the same key and
value types) values.
"""
raise NotImplementedError


class UnprocessableDataError(RepeatableQuantumError):
"""Exception that will be subclassed and raised by Tasks to indicate a
failure to process their inputs for some reason that is non-recoverable.
Notes
-----
An example is a known bright star that causes PSF measurement to fail, and
that makes that detector entirely non-recoverable.
Do not raise this unless we are convinced that the data cannot be
processed, even by a better algorithm. Most instances where this error
would be raised likely require an RFC to explicitly define the situation.
"""


class AnnotatedPartialOutputsError(RepeatableQuantumError):
"""Exception that runQuantum raises when the (partial) outputs it has
written contain information about their own incompleteness or degraded
quality.
This exception should always chain the original error. When the
executor catches this exception, it will report the original exception. In
contrast, other exceptions raised from ``runQuantum`` are considered to
invalidate any outputs that are already written.
"""

@classmethod
def annotate(
cls, error: Exception, *args: GetSetDictMetadataHolder | None, log: logging.Logger
) -> AnnotatedPartialOutputsError:
"""Set metadata on outputs to explain the nature of the failure.
Parameters
----------
error : `Exception`
Exception that caused the task to fail.
*args : `GetSetDictMetadataHolder`
Objects (e.g. Task, Exposure, SimpleCatalog) to annotate with
failure information. They must have a `metadata` property.
log : `logging.Logger`
Log to send error message to.
Returns
-------
error : `AnnotatedPartialOutputsError`
Exception that the failing task can ``raise from`` with the
passed-in exception.
Notes
-----
This should be called from within an except block that has caught an
exception. Here is an example of handling a failure in
``PipelineTask.runQuantum`` that annotates and writes partial outputs:
.. code-block:: py
:name: annotate-error-example
def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
exposures = inputs.pop("exposures")
assert not inputs, "runQuantum got more inputs than expected"
result = pipeBase.Struct(catalog=None)
try:
self.run(exposure)
except pipeBase.AlgorithmError as e:
error = pipeBase.AnnotatedPartialOutputsError.annotate(
e, self, result.catalog, log=self.log
)
raise error from e
finally:
butlerQC.put(result, outputRefs)
"""
failure_info = {
"message": str(error),
"type": introspection.get_full_type_name(error),
}
if other := getattr(error, "metadata", None):
failure_info["metadata"] = other

# NOTE: Can't fully test this in pipe_base because afw is not a
# dependency; test_calibrateImage.py in pipe_tasks gives more coverage.
for item in args:
# Some outputs may not exist, so we cannot set metadata on them.
if item is None:
continue
item.metadata.set_dict("failure", failure_info) # type: ignore

log.exception(
"Task failed with only partial outputs; see exception message for details.",
exc_info=error,
)

return cls("Task failed and wrote partial outputs: see chained exception for details.")


class InvalidQuantumError(Exception):
"""Exception that may be raised by PipelineTasks (and code they delegate
to) in order to indicate logic bug or configuration problem.
Expand Down
1 change: 1 addition & 0 deletions python/lsst/pipe/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from typing import TYPE_CHECKING, Any, ClassVar

import lsst.utils
import lsst.utils.introspection
import lsst.utils.logging
from lsst.pex.config import ConfigurableField
from lsst.utils.timer import logInfo
Expand Down
6 changes: 0 additions & 6 deletions tests/test_pipeTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ def _makePipeline(tasks):
class PipelineToolsTestCase(unittest.TestCase):
"""A test case for pipelineTools."""

def setUp(self):
pass

def tearDown(self):
pass

def testIsOrdered(self):
"""Tests for pipeTools.isPipelineOrdered method."""
pipeline = _makePipeline([("A", "B", "task1"), ("B", "C", "task2")])
Expand Down
12 changes: 0 additions & 12 deletions tests/test_pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ class ConfigIRTestCase(unittest.TestCase):
so it should be tested here.
"""

def setUp(self):
pass

def tearDown(self):
pass

def testMergeConfig(self):
# Create some configs to merge
config1 = ConfigIR(
Expand Down Expand Up @@ -84,12 +78,6 @@ def testMergeConfig(self):
class PipelineIRTestCase(unittest.TestCase):
"""A test case for PipelineIR objects."""

def setUp(self):
pass

def tearDown(self):
pass

def testPipelineIRInitChecks(self):
# Missing description
pipeline_str = """
Expand Down
41 changes: 35 additions & 6 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,6 @@ def run(self, val):
class TaskTestCase(unittest.TestCase):
"""A test case for Task."""

def setUp(self):
self.valDict = dict()

def tearDown(self):
self.valDict = None

def testBasics(self):
"""Test basic construction and use of a task."""
for addend in (1.1, -3.5):
Expand Down Expand Up @@ -325,6 +319,41 @@ def testTimeMethod(self):
new_meta = yaml.safe_load(y)
self.assertEqual(new_meta, addMultTask.metadata)

def test_annotate_exception(self):
"""Test annotating failures in the task metadata when a non-Task
exception is raised (when there is no `metadata` on the exception).
"""
task = AddMultTask()
msg = "something failed!"
error = ValueError(msg)
with self.assertLogs("addMult", level="ERROR") as cm:
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure"]["message"], msg)
self.assertEqual(task.metadata["failure"]["type"], "ValueError")
self.assertNotIn("metadata", task.metadata["failure"])

def test_annotate_task_exception(self):
"""Test annotating failures in the task metadata when a Task-specific
exception is raised.
"""

class TestError(pipeBase.AlgorithmError):
@property
def metadata(self):
return {"something": 12345}

task = AddMultTask()
msg = "something failed!"
error = TestError(msg)
with self.assertLogs("addMult", level="ERROR") as cm:
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure"]["message"], msg)
result = "test_task.TaskTestCase.test_annotate_task_exception.<locals>.TestError"
self.assertEqual(task.metadata["failure"]["type"], result)
self.assertEqual(task.metadata["failure"]["metadata"]["something"], 12345)


class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
"""Run file leak tests."""
Expand Down

0 comments on commit 7c47f2a

Please sign in to comment.