Skip to content

Commit

Permalink
New version of task exceptions and annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
parejkoj committed Feb 23, 2024
1 parent 53f7d8a commit 73295c3
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 60 deletions.
71 changes: 35 additions & 36 deletions python/lsst/pipe/base/_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations
from typing import Protocol

import abc
import itertools
import logging

from lsst.daf.base import PropertyList
from lsst.utils import introspection

from ._task_metadata import GetSetDictMetadata, NestedMetadataDict

__all__ = (
"UnprocessableDataError",
"AnnotatedPartialOutputsError",
Expand All @@ -41,10 +43,6 @@
"InvalidQuantumError",
)

from typing import Protocol

from ._task_metadata import GetSetDictMetadata


class GetSetDictMetadataHolder(Protocol):
"""Protocol for objects that have a ``metadata`` attribute that satisfies
Expand Down Expand Up @@ -95,13 +93,14 @@ class RepeatableQuantumError(RuntimeError, abc.ABC):

@property
@abc.abstractmethod
def metadata(self) -> None:
"""
Metadata from the raising `~lsst.pipe.base.Task` with more information
about the failure. The contents of the dict are `~lsst.pipe.base.Task`
-dependent.
def metadata(self) -> NestedMetadataDict | None:
"""Metadata from the raising `~lsst.pipe.base.Task` with more
information about the failure. The contents of the dict are
`~lsst.pipe.base.Task`-dependent, and must have `str` keys and `str`,
`int`, `float`, `bool`, or nested-dictionary (with the same key and
value types) values.
"""
pass
raise NotImplementedError


class UnprocessableDataError(RepeatableQuantumError):
Expand Down Expand Up @@ -130,19 +129,19 @@ class AnnotatedPartialOutputsError(RepeatableQuantumError):
"""

@classmethod
def annotate(cls, exception: Exception, *, task, exposures=(), catalogs=()) -> None:
def annotate(cls, exception: Exception, *args: GetSetDictMetadataHolder,
log: logging.Logger = None) -> AnnotatedPartialOutputsError:
"""Set metadata on outputs to explain the nature of the failure.
Parameters
----------
exception : `Exception`
Exception that caused the task to fail.
task : `lsst.pipe.base.Task`
Task that failed and that will be annotated.
exposures : `tuple` [`lsst.afw.image.Exposure`], optional
Exposures to annotate with information about the failure.
catalogs : `tuple` [`lsst.afw.table.Catalog`], optional
Catalogs to annotate with information about the failure.
*args : `GetSetDictMetadataHolder`
Objects (e.g. Task, Exposure, SimpleCatalog) to annotate with
failure information. They must have a `metadata` property.
log : `logging.Logger`, optional
Log to send error message to.
Returns
-------
Expand All @@ -154,8 +153,7 @@ def annotate(cls, exception: Exception, *, task, exposures=(), catalogs=()) -> N
-----
This should be called from within an except block that has caught an
exception. Here is an example of handling a failure in
``PipelineTask.runQuantum`` that annotates and writes partial outputs
outputs:
``PipelineTask.runQuantum`` that annotates and writes partial outputs:
.. code-block:: py
:name: annotate-error-example
Expand All @@ -170,30 +168,31 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
self.run(exposure)
except pipeBase.RepeatableQuantumError as e:
error = pipeBase.AnnotatedPartialOutputsError.annotate(
e,
catalogs=[result.catalog],
task=self)
e, self, result.catalog, log=self.log
)
raise error from e
finally:
butlerQC.put(result, outputRefs)
"""
# NOTE: Can't test this in pipe_base because afw is not a dependency;
# test_calibrateImage.py in pipe_tasks provides significant coverage.
for item in itertools.chain(exposures, catalogs):
failure_info = {
"message": str(exception),
"type": introspection.get_full_type_name(exception),
}
if other := getattr(exception, "metadata", None):
failure_info["metadata"] = other

# NOTE: Can't fully test this in pipe_base because afw is not a
# dependency; test_calibrateImage.py in pipe_tasks gives more coverage.
for item in args:
# Some outputs may not exist, so we cannot set metadata on them.
if item is None:
continue

Check warning on line 189 in python/lsst/pipe/base/_status.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/_status.py#L189

Added line #L189 was not covered by tests
if (metadata := item.getMetadata()) is None:
metadata = PropertyList()
item.setMetadata(metadata)
metadata.add("failure_message", str(exception))
metadata.add("failure_type", introspection.get_full_type_name(exception))
if other := getattr(exception, "metadata", None):
metadata.add("failure_metadata", PropertyList.from_mapping(other))
item.metadata.set_dict("failure", failure_info)

task.annotate_failure(exception)
if log:
log.error("Task failed with only partial outputs. %s: %s", exception, str(exception))

return cls("Task completed with only partial outputs: see chained exception for details.")
return cls("Task failed and wrote partial outputs: see chained exception for details.")


class InvalidQuantumError(Exception):
Expand Down
12 changes: 0 additions & 12 deletions python/lsst/pipe/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import lsst.utils.introspection
import lsst.utils.logging
from lsst.pex.config import ConfigurableField
from lsst.utils import introspection
from lsst.utils.timer import logInfo

if TYPE_CHECKING:
Expand Down Expand Up @@ -372,17 +371,6 @@ class OtherTaskConfig(lsst.pex.config.Config):
"""
return ConfigurableField(doc=doc, target=cls)

def annotate_failure(self, exception):
"""Annotate this task's metadata with information about an exception
that was raised during the execution of the task.
"""
self.metadata["failure_message"] = str(exception)
self.metadata["failure_type"] = introspection.get_full_type_name(exception)
if metadata := getattr(exception, "metadata", None):
self.metadata["failure_metadata"] = metadata

self.log.warning(str(exception))

def _computeFullName(self, name: str) -> str:
"""Compute the full name of a subtask or metadata item, given its brief
name.
Expand Down
24 changes: 12 additions & 12 deletions tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,21 +319,21 @@ def testTimeMethod(self):
new_meta = yaml.safe_load(y)
self.assertEqual(new_meta, addMultTask.metadata)

def test_annotate_failure_generic_exception(self):
def test_annotate_exception(self):
"""Test annotating failures in the task metadata when a non-Task
exception is raised.
exception is raised (when there is no `metadata` on the exception).
"""
task = AddMultTask()
msg = "something failed!"
error = ValueError(msg)
with self.assertLogs("addMult", level="WARNING") as cm:
task.annotate_failure(error)
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure_message"], msg)
self.assertEqual(task.metadata["failure_type"], "ValueError")
self.assertNotIn("failure_metadata", task.metadata)
self.assertEqual(task.metadata["failure"]["message"], msg)
self.assertEqual(task.metadata["failure"]["type"], "ValueError")
self.assertNotIn("metadata", task.metadata["failure"])

def test_annotate_failure_task_exception(self):
def test_annotate_task_exception(self):
"""Test annotating failures in the task metadata when a Task-specific
exception is raised.
"""
Expand All @@ -347,12 +347,12 @@ def metadata(self):
msg = "something failed!"
error = TestError(msg)
with self.assertLogs("addMult", level="WARNING") as cm:
task.annotate_failure(error)
pipeBase.AnnotatedPartialOutputsError.annotate(error, task, log=task.log)
self.assertIn(msg, "\n".join(cm.output))
self.assertEqual(task.metadata["failure_message"], msg)
result = "test_task.TaskTestCase.test_annotate_failure_task_exception.<locals>.TestError"
self.assertEqual(task.metadata["failure_type"], result)
self.assertEqual(task.metadata["failure_metadata"]["something"], 12345)
self.assertEqual(task.metadata["failure"]["message"], msg)
result = "test_task.TaskTestCase.test_annotate_task_exception.<locals>.TestError"
self.assertEqual(task.metadata["failure"]["type"], result)
self.assertEqual(task.metadata["failure"]["metadata"]["something"], 12345)


class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
Expand Down

0 comments on commit 73295c3

Please sign in to comment.