Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-39842: Configurable exception handling in CalibrateImageTask #835

Merged
merged 2 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 61 additions & 30 deletions python/lsst/pipe/tasks/calibrateImage.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ class CalibrateImageConnections(pipeBase.PipelineTaskConnections,
storageClass="SourceCatalog",
)

# TODO: We want some kind of flag on Exposures/Catalogs to make it obvious
# which components had failed to be computed/persisted
output_exposure = connectionTypes.Output(
# TODO DM-38732: We want some kind of flag on Exposures/Catalogs to make
# it obvious which components had failed to be computed/persisted.
exposure = connectionTypes.Output(
doc="Photometrically calibrated exposure with fitted calibrations and summary statistics.",
name="initial_pvi",
storageClass="ExposureF",
Expand Down Expand Up @@ -408,6 +408,8 @@ def __init__(self, initial_stars_schema=None, **kwargs):

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
exposures = inputs.pop("exposures")

id_generator = self.config.id_generator.apply(butlerQC.quantum.dataId)

astrometry_loader = lsst.meas.algorithms.ReferenceObjectLoader(
Expand All @@ -424,12 +426,33 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
config=self.config.photometry_ref_loader, log=self.log)
self.photometry.match.setRefObjLoader(photometry_loader)

outputs = self.run(id_generator=id_generator, **inputs)

butlerQC.put(outputs, outputRefs)
# This should not happen with a properly configured execution context.
assert not inputs, "runQuantum got more inputs than expected"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If/when this falls over, will the error message specify that the runQuantum being talked about is the one from calibrateImage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the assert will give where it happened. This is a "this should absolutely never happen" error, hence the bare assert.


# Specify the fields that `annotate` needs below, to ensure they
# exist, even as None.
result = pipeBase.Struct(exposure=None,
stars_footprints=None,
psf_stars_footprints=None,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm torn as to whether it's better to spell out the possible fields here with None values, vs. letting run be the sole source of truth on those. I think the QuantumContext changes in pipe_base should allow either to work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To simplify the call to annotate below (so it's not a mess of getattr), I need to at least specify the items that are passed to annotate. I don't like specifying a result list in both places (runQuantum and the top of run); I've changed it so run just makes an empty Struct (this caught a bug!) which is a bit better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it could be fragile if we ever change this task to output more things in the result Struct. Is it not possible to construct a list of arguments to feed the error annotation by iterating over all the existing things in the Struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not without using a whole bunch of getattr and tests of whether those things have a metadata property. In this design, the PipelineTask author has to specifically choose which output datasets could get annotated on failure and pass them to annotate individually. It might be nice to just pass the result struct and let annotate figure it out, but that's probably more complicated than it's worth (not all datasets support metadata, e.g. AstropyArrow).

try:
self.run(exposures=exposures, result=result, id_generator=id_generator)
except pipeBase.AlgorithmError as e:
error = pipeBase.AnnotatedPartialOutputsError.annotate(
e,
self,
result.exposure,
result.psf_stars_footprints,
result.stars_footprints,
log=self.log
)
butlerQC.put(result, outputRefs)
raise error from e

butlerQC.put(result, outputRefs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in a finally block?

Copy link
Contributor Author

@parejkoj parejkoj Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We only butler.put if the task succeeded, or if AlgorithmError is caught, all other errors can't be annotated, so writing partial outputs is unhelpful (and potentially harmful, as downstream tasks won't know that they're partial).


@timeMethod
def run(self, *, exposures, id_generator=None):
def run(self, *, exposures, id_generator=None, result=None):
"""Find stars and perform psf measurement, then do a deeper detection
and measurement and calibrate astrometry and photometry from that.

Expand All @@ -442,13 +465,17 @@ def run(self, *, exposures, id_generator=None):
before doing further processing.
id_generator : `lsst.meas.base.IdGenerator`, optional
Object that generates source IDs and provides random seeds.
result : `lsst.pipe.base.Struct`, optional
Result struct that is modified to allow saving of partial outputs
for some failure conditions. If the task completes successfully,
this is also returned.

Returns
-------
result : `lsst.pipe.base.Struct`
Results as a struct with attributes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the results Struct guaranteed to have all of the documented attributes, now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean?

If the task completes, it returns result. If an exception is raised, the input parameter has (hopefully!) been modified, but nothing is returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, but apparently I never added docs for the result parameter: I'll do that now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I added:

        result : `lsst.pipe.base.Struct`, optional
            Result struct that is modified to allow saving of partial outputs
            for some failure conditions. If the task completes successfully,
            this is also returned.


``output_exposure``
``exposure``
Calibrated exposure, with pixels in nJy units.
(`lsst.afw.image.Exposure`)
``stars``
Expand Down Expand Up @@ -477,40 +504,44 @@ def run(self, *, exposures, id_generator=None):
Reference catalog stars matches used in the photometric fit.
(`list` [`lsst.afw.table.ReferenceMatch`] or `lsst.afw.table.BaseCatalog`)
"""
if result is None:
result = pipeBase.Struct()
if id_generator is None:
id_generator = lsst.meas.base.IdGenerator()

exposure = self._handle_snaps(exposures)
result.exposure = self._handle_snaps(exposures)

# TODO remove on DM-43083: work around the fact that we don't want
# to run streak detection in this task in production.
exposure.mask.addMaskPlane("STREAK")
result.exposure.mask.addMaskPlane("STREAK")

psf_stars, background, candidates = self._compute_psf(exposure, id_generator)
result.psf_stars_footprints, result.background, candidates = self._compute_psf(result.exposure,
id_generator)
result.psf_stars = result.psf_stars_footprints.asAstropy()

self._measure_aperture_correction(exposure, psf_stars)
self._measure_aperture_correction(result.exposure, result.psf_stars)

stars = self._find_stars(exposure, background, id_generator)
self._match_psf_stars(psf_stars, stars)
result.stars_footprints = self._find_stars(result.exposure, result.background, id_generator)
self._match_psf_stars(result.psf_stars_footprints, result.stars_footprints)
result.stars = result.stars_footprints.asAstropy()

astrometry_matches, astrometry_meta = self._fit_astrometry(exposure, stars)
stars, photometry_matches, photometry_meta, photo_calib = self._fit_photometry(exposure, stars)
astrometry_matches, astrometry_meta = self._fit_astrometry(result.exposure, result.stars_footprints)
if self.config.optional_outputs:
result.astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches,
astrometry_meta)

result.stars_footprints, photometry_matches, \
photometry_meta, result.applied_photo_calib = self._fit_photometry(result.exposure,
result.stars_footprints)
# fit_photometry returns a new catalog, so we need a new astropy table view.
result.stars = result.stars_footprints.asAstropy()
if self.config.optional_outputs:
result.photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches,
photometry_meta)

self._summarize(exposure, stars, background)
self._summarize(result.exposure, result.stars_footprints, result.background)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this using result.stars_footprints and not result.stars ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ComputeExposureSummaryStatsTask takes a SourceCatalog. result.stars is just for the output. Eventually we'll start converting downstream tasks (like ipdiffim, maybe?) to take AstropyArrow input catalogs, but that's a long ways off.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so stars_footprints and stars has the same information but in different formats? Cute, OK, carry on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the Connections here: https://github.com/lsst/pipe_tasks/blob/main/python/lsst/pipe/tasks/calibrateImage.py#L87

Eventually, the goal will be fore the stars_footprints to only contain the footprints and none of the catalog information, but that's a long ways off.


if self.config.optional_outputs:
astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches, astrometry_meta)
photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches, photometry_meta)

return pipeBase.Struct(output_exposure=exposure,
stars_footprints=stars,
stars=stars.asAstropy(),
psf_stars_footprints=psf_stars,
psf_stars=psf_stars.asAstropy(),
background=background,
applied_photo_calib=photo_calib,
astrometry_matches=astrometry_matches,
photometry_matches=photometry_matches)
return result

def _handle_snaps(self, exposure):
"""Combine two snaps into one exposure, or return a single exposure.
Expand Down
105 changes: 95 additions & 10 deletions tests/test_calibrateImage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import unittest
from unittest import mock
import tempfile

import astropy.units as u
from astropy.coordinates import SkyCoord
import numpy as np

import lsst.afw.image as afwImage
import lsst.afw.math as afwMath
import lsst.afw.table as afwTable
import lsst.daf.base
import lsst.daf.butler
Expand Down Expand Up @@ -150,16 +152,24 @@ def _check_run(self, calibrate, result):
# re-estimation during source detection.
self.assertEqual(len(result.background), 4)

# Both afw and astropy psf_stars catalogs should be populated.
self.assertEqual(result.psf_stars["calib_psf_used"].sum(), 3)
self.assertEqual(result.psf_stars_footprints["calib_psf_used"].sum(), 3)

# Check that the summary statistics are reasonable.
summary = result.output_exposure.info.getSummaryStats()
summary = result.exposure.info.getSummaryStats()
self.assertFloatsAlmostEqual(summary.psfSigma, 2.0, rtol=1e-2)
self.assertFloatsAlmostEqual(summary.ra, self.sky_center.getRa().asDegrees(), rtol=1e-7)
self.assertFloatsAlmostEqual(summary.dec, self.sky_center.getDec().asDegrees(), rtol=1e-7)

# Should have finite sky coordinates in the afw and astropy catalogs.
self.assertTrue(np.isfinite(result.stars_footprints["coord_ra"]).all())
self.assertTrue(np.isfinite(result.stars["coord_ra"]).all())

# Returned photoCalib should be the applied value, not the ==1 one on the exposure.
self.assertFloatsAlmostEqual(result.applied_photo_calib.getCalibrationMean(),
self.photo_calib, rtol=2e-3)
# Should have flux/magnitudes in the afw and astropy catalogs
# Should have calibrated flux/magnitudes in the afw and astropy catalogs
self.assertIn("slot_PsfFlux_flux", result.stars_footprints.schema)
self.assertIn("slot_PsfFlux_mag", result.stars_footprints.schema)
self.assertEqual(result.stars["slot_PsfFlux_flux"].unit, u.nJy)
Expand Down Expand Up @@ -396,7 +406,7 @@ class CalibrateImageTaskRunQuantumTests(lsst.utils.tests.TestCase):
def setUp(self):
instrument = "testCam"
exposure0 = 101
exposure1 = 101
exposure1 = 102
visit = 100101
detector = 42

Expand All @@ -412,10 +422,10 @@ def setUp(self):
self.repo.registry.syncDimensionData("instrument", instrumentRecord)

# dataIds for fake data
butlerTests.addDataIdValue(self.repo, "detector", detector)
butlerTests.addDataIdValue(self.repo, "exposure", exposure0)
butlerTests.addDataIdValue(self.repo, "exposure", exposure1)
butlerTests.addDataIdValue(self.repo, "visit", visit)
butlerTests.addDataIdValue(self.repo, "detector", detector)

# inputs
butlerTests.addDatasetType(self.repo, "postISRCCD", {"instrument", "exposure", "detector"},
Expand Down Expand Up @@ -465,6 +475,7 @@ def setUp(self):
# put empty data
self.butler = butlerTests.makeTestCollection(self.repo)
self.butler.put(afwImage.ExposureF(), "postISRCCD", self.exposure0_id)
self.butler.put(afwImage.ExposureF(), "postISRCCD", self.exposure1_id)
self.butler.put(afwTable.SimpleCatalog(), "gaia_dr3_20230707", self.htm_id)
self.butler.put(afwTable.SimpleCatalog(), "ps1_pv3_3pi_20170110", self.htm_id)

Expand All @@ -481,7 +492,7 @@ def test_runQuantum(self):
"astrometry_ref_cat": [self.htm_id],
"photometry_ref_cat": [self.htm_id],
# outputs
"output_exposure": self.visit_id,
"exposure": self.visit_id,
"stars": self.visit_id,
"stars_footprints": self.visit_id,
"background": self.visit_id,
Expand All @@ -498,7 +509,7 @@ def test_runQuantum(self):
self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707")
self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110")
# Check that the proper kwargs are passed to run().
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "id_generator"})
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result", "id_generator"})

def test_runQuantum_2_snaps(self):
task = CalibrateImageTask()
Expand All @@ -510,7 +521,7 @@ def test_runQuantum_2_snaps(self):
"astrometry_ref_cat": [self.htm_id],
"photometry_ref_cat": [self.htm_id],
# outputs
"output_exposure": self.visit_id,
"exposure": self.visit_id,
"stars": self.visit_id,
"stars_footprints": self.visit_id,
"background": self.visit_id,
Expand All @@ -527,7 +538,7 @@ def test_runQuantum_2_snaps(self):
self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707")
self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110")
# Check that the proper kwargs are passed to run().
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "id_generator"})
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result", "id_generator"})

def test_runQuantum_no_optional_outputs(self):
config = CalibrateImageTask.ConfigClass()
Expand All @@ -541,7 +552,7 @@ def test_runQuantum_no_optional_outputs(self):
"astrometry_ref_cat": [self.htm_id],
"photometry_ref_cat": [self.htm_id],
# outputs
"output_exposure": self.visit_id,
"exposure": self.visit_id,
"stars": self.visit_id,
"stars_footprints": self.visit_id,
"applied_photo_calib": self.visit_id,
Expand All @@ -553,14 +564,88 @@ def test_runQuantum_no_optional_outputs(self):
self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707")
self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110")
# Check that the proper kwargs are passed to run().
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "id_generator"})
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result", "id_generator"})

def test_lintConnections(self):
"""Check that the connections are self-consistent.
"""
Connections = CalibrateImageTask.ConfigClass.ConnectionsClass
lsst.pipe.base.testUtils.lintConnections(Connections)

def test_runQuantum_exception(self):
"""Test exception handling in runQuantum.
"""
task = CalibrateImageTask()
lsst.pipe.base.testUtils.assertValidInitOutput(task)

quantum = lsst.pipe.base.testUtils.makeQuantum(
task, self.butler, self.visit_id,
{"exposures": [self.exposure0_id],
"astrometry_ref_cat": [self.htm_id],
"photometry_ref_cat": [self.htm_id],
# outputs
"exposure": self.visit_id,
"stars": self.visit_id,
"stars_footprints": self.visit_id,
"background": self.visit_id,
"psf_stars": self.visit_id,
"psf_stars_footprints": self.visit_id,
"applied_photo_calib": self.visit_id,
"initial_pvi_background": self.visit_id,
"astrometry_matches": self.visit_id,
"photometry_matches": self.visit_id,
})

# A generic exception should raise directly.
msg = "mocked run exception"
with (
mock.patch.object(task, "run", side_effect=ValueError(msg)),
self.assertRaisesRegex(ValueError, "mocked run exception")
):
lsst.pipe.base.testUtils.runTestQuantum(task, self.butler, quantum, mockRun=False)

# A AlgorimthError should write annotated partial outputs.
error = lsst.meas.algorithms.MeasureApCorrError(name="test", nSources=100, ndof=101)

def mock_run(exposures, result=None, id_generator=None):
"""Mock success through compute_psf, but failure after.
"""
result.exposure = afwImage.ExposureF(10, 10)
result.psf_stars_footprints = afwTable.SourceCatalog()
result.psf_stars = afwTable.SourceCatalog().asAstropy()
result.background = afwMath.BackgroundList()
raise error

with (
mock.patch.object(task, "run", side_effect=mock_run),
self.assertRaises(lsst.pipe.base.AnnotatedPartialOutputsError),
lsst.log.UsePythonLogging(), # so that assertLogs works with lsst.log
):
with self.assertLogs("lsst.calibrateImage", level="ERROR") as cm:
lsst.pipe.base.testUtils.runTestQuantum(task,
self.butler,
quantum,
mockRun=False)

logged = "\n".join(cm.output)
self.assertIn("Task failed with only partial outputs", logged)
self.assertIn("MeasureApCorrError", logged)

# NOTE: This is an integration test of afw Exposure & SourceCatalog
# metadata with the error annotation system in pipe_base.
# Check that we did get the annotated partial outputs...
pvi = self.butler.get("initial_pvi", self.visit_id)
self.assertIn("Unable to measure aperture correction", pvi.metadata["failure.message"])
self.assertIn("MeasureApCorrError", pvi.metadata["failure.type"])
self.assertEqual(pvi.metadata["failure.metadata.ndof"], 101)
stars = self.butler.get("initial_psf_stars_footprints_detector", self.visit_id)
self.assertIn("Unable to measure aperture correction", stars.metadata["failure.message"])
self.assertIn("MeasureApCorrError", stars.metadata["failure.type"])
self.assertEqual(stars.metadata["failure.metadata.ndof"], 101)
# ... but not the un-produced outputs.
with self.assertRaises(FileNotFoundError):
self.butler.get("initial_stars_footprints_detector", self.visit_id)


def setup_module(module):
lsst.utils.tests.init()
Expand Down
Loading