Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-39842: Add new Task exceptions and error annotation #400

Merged
merged 3 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions python/lsst/pipe/base/_quantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@
``[calexp1, calexp2]``. Like wise if there is a single ref, then
only a single object need be passed. The same restriction applies
if dataset is directly a `list` of `~lsst.daf.butler.DatasetRef`
or a single `~lsst.daf.butler.DatasetRef`.
or a single `~lsst.daf.butler.DatasetRef`. If ``values.NAME`` is
None, no output is written.
dataset : `OutputQuantizedConnection` or `list`[`DatasetRef`] \
or `DatasetRef`
This argument may either be an `InputQuantizedConnection` which
Expand All @@ -371,7 +372,8 @@
" attributes must be passed as the values to put"
)
for name, refs in dataset:
valuesAttribute = getattr(values, name)
if (valuesAttribute := getattr(values, name, None)) is None:
continue

Check warning on line 376 in python/lsst/pipe/base/_quantumContext.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/_quantumContext.py#L376

Added line #L376 was not covered by tests
if isinstance(refs, list | tuple):
if len(refs) != len(valuesAttribute):
raise ValueError(f"There must be a object to put for every Dataset ref in {name}")
Expand Down
132 changes: 128 additions & 4 deletions python/lsst/pipe/base/_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,23 @@

from __future__ import annotations

import abc
import logging
from typing import Protocol

from lsst.utils import introspection

from ._task_metadata import GetSetDictMetadata, NestedMetadataDict

__all__ = (
"UnprocessableDataError",
"AnnotatedPartialOutputsError",
"NoWorkFound",
"RepeatableQuantumError",
"AlgorithmError",
"InvalidQuantumError",
)

from typing import Protocol

from ._task_metadata import GetSetDictMetadata


class GetSetDictMetadataHolder(Protocol):
"""Protocol for objects that have a ``metadata`` attribute that satisfies
Expand Down Expand Up @@ -86,6 +93,123 @@
EXIT_CODE = 20


class AlgorithmError(RepeatableQuantumError, abc.ABC):
"""Exception that may be raised by PipelineTasks (and code they delegate
to) in order to indicate a repeatable algorithmic failure that will not be
addressed by retries.

Subclass this exception to define the metadata associated with the error
(for example: number of data points in a fit vs. degrees of freedom).
"""

@property
@abc.abstractmethod
def metadata(self) -> NestedMetadataDict | None:
"""Metadata from the raising `~lsst.pipe.base.Task` with more
information about the failure. The contents of the dict are
`~lsst.pipe.base.Task`-dependent, and must have `str` keys and `str`,
`int`, `float`, `bool`, or nested-dictionary (with the same key and
value types) values.
"""
raise NotImplementedError


class UnprocessableDataError(RepeatableQuantumError):
"""Exception that will be subclassed and raised by Tasks to indicate a
failure to process their inputs for some reason that is non-recoverable.

Notes
-----
An example is a known bright star that causes PSF measurement to fail, and
that makes that detector entirely non-recoverable.

Do not raise this unless we are convinced that the data cannot be
processed, even by a better algorithm. Most instances where this error
would be raised likely require an RFC to explicitly define the situation.
"""


class AnnotatedPartialOutputsError(RepeatableQuantumError):
"""Exception that runQuantum raises when the (partial) outputs it has
written contain information about their own incompleteness or degraded
quality.

This exception should always chain the original error. When the
executor catches this exception, it will report the original exception. In
contrast, other exceptions raised from ``runQuantum`` are considered to
invalidate any outputs that are already written.
"""

@classmethod
def annotate(
cls, error: Exception, *args: GetSetDictMetadataHolder | None, log: logging.Logger
) -> AnnotatedPartialOutputsError:
"""Set metadata on outputs to explain the nature of the failure.

Parameters
----------
error : `Exception`
Exception that caused the task to fail.
*args : `GetSetDictMetadataHolder`
Objects (e.g. Task, Exposure, SimpleCatalog) to annotate with
failure information. They must have a `metadata` property.
log : `logging.Logger`
Log to send error message to.

Returns
-------
error : `AnnotatedPartialOutputsError`
Exception that the failing task can ``raise from`` with the
passed-in exception.

Notes
-----
This should be called from within an except block that has caught an
exception. Here is an example of handling a failure in
``PipelineTask.runQuantum`` that annotates and writes partial outputs:

.. code-block:: py
:name: annotate-error-example

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
exposures = inputs.pop("exposures")
assert not inputs, "runQuantum got more inputs than expected"

result = pipeBase.Struct(catalog=None)
try:
self.run(exposure)
except pipeBase.AlgorithmError as e:
error = pipeBase.AnnotatedPartialOutputsError.annotate(
e, self, result.catalog, log=self.log
)
raise error from e
finally:
butlerQC.put(result, outputRefs)
"""
failure_info = {
"message": str(error),
"type": introspection.get_full_type_name(error),
}
if other := getattr(error, "metadata", None):
failure_info["metadata"] = other

# NOTE: Can't fully test this in pipe_base because afw is not a
# dependency; test_calibrateImage.py in pipe_tasks gives more coverage.
for item in args:
# Some outputs may not exist, so we cannot set metadata on them.
if item is None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same Q as above, can this still have the desired behavior with the syntax if not item ?

continue

Check warning on line 202 in python/lsst/pipe/base/_status.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/_status.py#L202

Added line #L202 was not covered by tests
item.metadata.set_dict("failure", failure_info) # type: ignore

log.exception(
"Task failed with only partial outputs; see exception message for details.",
exc_info=error,
)

return cls("Task failed and wrote partial outputs: see chained exception for details.")


class InvalidQuantumError(Exception):
"""Exception that may be raised by PipelineTasks (and code they delegate
to) in order to indicate logic bug or configuration problem.
Expand Down
1 change: 1 addition & 0 deletions python/lsst/pipe/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from typing import TYPE_CHECKING, Any, ClassVar

import lsst.utils
import lsst.utils.introspection
import lsst.utils.logging
from lsst.pex.config import ConfigurableField
from lsst.utils.timer import logInfo
Expand Down
6 changes: 0 additions & 6 deletions tests/test_pipeTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ def _makePipeline(tasks):
class PipelineToolsTestCase(unittest.TestCase):
"""A test case for pipelineTools."""

def setUp(self):
pass

def tearDown(self):
pass

def testIsOrdered(self):
"""Tests for pipeTools.isPipelineOrdered method."""
pipeline = _makePipeline([("A", "B", "task1"), ("B", "C", "task2")])
Expand Down
12 changes: 0 additions & 12 deletions tests/test_pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ class ConfigIRTestCase(unittest.TestCase):
so it should be tested here.
"""

def setUp(self):
pass

def tearDown(self):
pass

def testMergeConfig(self):
# Create some configs to merge
config1 = ConfigIR(
Expand Down Expand Up @@ -84,12 +78,6 @@ def testMergeConfig(self):
class PipelineIRTestCase(unittest.TestCase):
"""A test case for PipelineIR objects."""

def setUp(self):
pass

def tearDown(self):
pass

def testPipelineIRInitChecks(self):
# Missing description
pipeline_str = """
Expand Down
41 changes: 35 additions & 6 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,6 @@ def run(self, val):
class TaskTestCase(unittest.TestCase):
"""A test case for Task."""

def setUp(self):
self.valDict = dict()

def tearDown(self):
self.valDict = None

def testBasics(self):
"""Test basic construction and use of a task."""
for addend in (1.1, -3.5):
Expand Down Expand Up @@ -325,6 +319,41 @@ def testTimeMethod(self):
new_meta = yaml.safe_load(y)
self.assertEqual(new_meta, addMultTask.metadata)

def test_annotate_exception(self):
"""Test annotating failures in the task metadata when a non-Task
exception is raised (when there is no `metadata` on the exception).
"""
task = AddMultTask()
msg = "something failed!"
error = ValueError(msg)
with self.assertLogs("addMult", level="ERROR") as cm:
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure"]["message"], msg)
self.assertEqual(task.metadata["failure"]["type"], "ValueError")
self.assertNotIn("metadata", task.metadata["failure"])

def test_annotate_task_exception(self):
"""Test annotating failures in the task metadata when a Task-specific
exception is raised.
"""

class TestError(pipeBase.AlgorithmError):
@property
def metadata(self):
return {"something": 12345}

task = AddMultTask()
msg = "something failed!"
error = TestError(msg)
with self.assertLogs("addMult", level="ERROR") as cm:
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure"]["message"], msg)
result = "test_task.TaskTestCase.test_annotate_task_exception.<locals>.TestError"
self.assertEqual(task.metadata["failure"]["type"], result)
self.assertEqual(task.metadata["failure"]["metadata"]["something"], 12345)


class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
"""Run file leak tests."""
Expand Down
Loading