Skip to content

Commit

Permalink
Merge pull request #835 from lsst/tickets/DM-39842
Browse files Browse the repository at this point in the history
DM-39842: Configurable exception handling in CalibrateImageTask
  • Loading branch information
parejkoj authored Mar 22, 2024
2 parents a740e07 + f2d3e67 commit 88ce0b6
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 40 deletions.
91 changes: 61 additions & 30 deletions python/lsst/pipe/tasks/calibrateImage.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ class CalibrateImageConnections(pipeBase.PipelineTaskConnections,
storageClass="SourceCatalog",
)

# TODO: We want some kind of flag on Exposures/Catalogs to make it obvious
# which components had failed to be computed/persisted
output_exposure = connectionTypes.Output(
# TODO DM-38732: We want some kind of flag on Exposures/Catalogs to make
# it obvious which components had failed to be computed/persisted.
exposure = connectionTypes.Output(
doc="Photometrically calibrated exposure with fitted calibrations and summary statistics.",
name="initial_pvi",
storageClass="ExposureF",
Expand Down Expand Up @@ -408,6 +408,8 @@ def __init__(self, initial_stars_schema=None, **kwargs):

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
exposures = inputs.pop("exposures")

id_generator = self.config.id_generator.apply(butlerQC.quantum.dataId)

astrometry_loader = lsst.meas.algorithms.ReferenceObjectLoader(
Expand All @@ -424,12 +426,33 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
config=self.config.photometry_ref_loader, log=self.log)
self.photometry.match.setRefObjLoader(photometry_loader)

outputs = self.run(id_generator=id_generator, **inputs)

butlerQC.put(outputs, outputRefs)
# This should not happen with a properly configured execution context.
assert not inputs, "runQuantum got more inputs than expected"

# Specify the fields that `annotate` needs below, to ensure they
# exist, even as None.
result = pipeBase.Struct(exposure=None,
stars_footprints=None,
psf_stars_footprints=None,
)
try:
self.run(exposures=exposures, result=result, id_generator=id_generator)
except pipeBase.AlgorithmError as e:
error = pipeBase.AnnotatedPartialOutputsError.annotate(
e,
self,
result.exposure,
result.psf_stars_footprints,
result.stars_footprints,
log=self.log
)
butlerQC.put(result, outputRefs)
raise error from e

butlerQC.put(result, outputRefs)

@timeMethod
def run(self, *, exposures, id_generator=None):
def run(self, *, exposures, id_generator=None, result=None):
"""Find stars and perform psf measurement, then do a deeper detection
and measurement and calibrate astrometry and photometry from that.
Expand All @@ -442,13 +465,17 @@ def run(self, *, exposures, id_generator=None):
before doing further processing.
id_generator : `lsst.meas.base.IdGenerator`, optional
Object that generates source IDs and provides random seeds.
result : `lsst.pipe.base.Struct`, optional
Result struct that is modified to allow saving of partial outputs
for some failure conditions. If the task completes successfully,
this is also returned.
Returns
-------
result : `lsst.pipe.base.Struct`
Results as a struct with attributes:
``output_exposure``
``exposure``
Calibrated exposure, with pixels in nJy units.
(`lsst.afw.image.Exposure`)
``stars``
Expand Down Expand Up @@ -477,40 +504,44 @@ def run(self, *, exposures, id_generator=None):
Reference catalog stars matches used in the photometric fit.
(`list` [`lsst.afw.table.ReferenceMatch`] or `lsst.afw.table.BaseCatalog`)
"""
if result is None:
result = pipeBase.Struct()
if id_generator is None:
id_generator = lsst.meas.base.IdGenerator()

exposure = self._handle_snaps(exposures)
result.exposure = self._handle_snaps(exposures)

# TODO remove on DM-43083: work around the fact that we don't want
# to run streak detection in this task in production.
exposure.mask.addMaskPlane("STREAK")
result.exposure.mask.addMaskPlane("STREAK")

psf_stars, background, candidates = self._compute_psf(exposure, id_generator)
result.psf_stars_footprints, result.background, candidates = self._compute_psf(result.exposure,
id_generator)
result.psf_stars = result.psf_stars_footprints.asAstropy()

self._measure_aperture_correction(exposure, psf_stars)
self._measure_aperture_correction(result.exposure, result.psf_stars)

stars = self._find_stars(exposure, background, id_generator)
self._match_psf_stars(psf_stars, stars)
result.stars_footprints = self._find_stars(result.exposure, result.background, id_generator)
self._match_psf_stars(result.psf_stars_footprints, result.stars_footprints)
result.stars = result.stars_footprints.asAstropy()

astrometry_matches, astrometry_meta = self._fit_astrometry(exposure, stars)
stars, photometry_matches, photometry_meta, photo_calib = self._fit_photometry(exposure, stars)
astrometry_matches, astrometry_meta = self._fit_astrometry(result.exposure, result.stars_footprints)
if self.config.optional_outputs:
result.astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches,
astrometry_meta)

result.stars_footprints, photometry_matches, \
photometry_meta, result.applied_photo_calib = self._fit_photometry(result.exposure,
result.stars_footprints)
# fit_photometry returns a new catalog, so we need a new astropy table view.
result.stars = result.stars_footprints.asAstropy()
if self.config.optional_outputs:
result.photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches,
photometry_meta)

self._summarize(exposure, stars, background)
self._summarize(result.exposure, result.stars_footprints, result.background)

if self.config.optional_outputs:
astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches, astrometry_meta)
photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches, photometry_meta)

return pipeBase.Struct(output_exposure=exposure,
stars_footprints=stars,
stars=stars.asAstropy(),
psf_stars_footprints=psf_stars,
psf_stars=psf_stars.asAstropy(),
background=background,
applied_photo_calib=photo_calib,
astrometry_matches=astrometry_matches,
photometry_matches=photometry_matches)
return result

def _handle_snaps(self, exposure):
"""Combine two snaps into one exposure, or return a single exposure.
Expand Down
105 changes: 95 additions & 10 deletions tests/test_calibrateImage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import unittest
from unittest import mock
import tempfile

import astropy.units as u
from astropy.coordinates import SkyCoord
import numpy as np

import lsst.afw.image as afwImage
import lsst.afw.math as afwMath
import lsst.afw.table as afwTable
import lsst.daf.base
import lsst.daf.butler
Expand Down Expand Up @@ -150,16 +152,24 @@ def _check_run(self, calibrate, result):
# re-estimation during source detection.
self.assertEqual(len(result.background), 4)

# Both afw and astropy psf_stars catalogs should be populated.
self.assertEqual(result.psf_stars["calib_psf_used"].sum(), 3)
self.assertEqual(result.psf_stars_footprints["calib_psf_used"].sum(), 3)

# Check that the summary statistics are reasonable.
summary = result.output_exposure.info.getSummaryStats()
summary = result.exposure.info.getSummaryStats()
self.assertFloatsAlmostEqual(summary.psfSigma, 2.0, rtol=1e-2)
self.assertFloatsAlmostEqual(summary.ra, self.sky_center.getRa().asDegrees(), rtol=1e-7)
self.assertFloatsAlmostEqual(summary.dec, self.sky_center.getDec().asDegrees(), rtol=1e-7)

# Should have finite sky coordinates in the afw and astropy catalogs.
self.assertTrue(np.isfinite(result.stars_footprints["coord_ra"]).all())
self.assertTrue(np.isfinite(result.stars["coord_ra"]).all())

# Returned photoCalib should be the applied value, not the ==1 one on the exposure.
self.assertFloatsAlmostEqual(result.applied_photo_calib.getCalibrationMean(),
self.photo_calib, rtol=2e-3)
# Should have flux/magnitudes in the afw and astropy catalogs
# Should have calibrated flux/magnitudes in the afw and astropy catalogs
self.assertIn("slot_PsfFlux_flux", result.stars_footprints.schema)
self.assertIn("slot_PsfFlux_mag", result.stars_footprints.schema)
self.assertEqual(result.stars["slot_PsfFlux_flux"].unit, u.nJy)
Expand Down Expand Up @@ -396,7 +406,7 @@ class CalibrateImageTaskRunQuantumTests(lsst.utils.tests.TestCase):
def setUp(self):
instrument = "testCam"
exposure0 = 101
exposure1 = 101
exposure1 = 102
visit = 100101
detector = 42

Expand All @@ -412,10 +422,10 @@ def setUp(self):
self.repo.registry.syncDimensionData("instrument", instrumentRecord)

# dataIds for fake data
butlerTests.addDataIdValue(self.repo, "detector", detector)
butlerTests.addDataIdValue(self.repo, "exposure", exposure0)
butlerTests.addDataIdValue(self.repo, "exposure", exposure1)
butlerTests.addDataIdValue(self.repo, "visit", visit)
butlerTests.addDataIdValue(self.repo, "detector", detector)

# inputs
butlerTests.addDatasetType(self.repo, "postISRCCD", {"instrument", "exposure", "detector"},
Expand Down Expand Up @@ -465,6 +475,7 @@ def setUp(self):
# put empty data
self.butler = butlerTests.makeTestCollection(self.repo)
self.butler.put(afwImage.ExposureF(), "postISRCCD", self.exposure0_id)
self.butler.put(afwImage.ExposureF(), "postISRCCD", self.exposure1_id)
self.butler.put(afwTable.SimpleCatalog(), "gaia_dr3_20230707", self.htm_id)
self.butler.put(afwTable.SimpleCatalog(), "ps1_pv3_3pi_20170110", self.htm_id)

Expand All @@ -481,7 +492,7 @@ def test_runQuantum(self):
"astrometry_ref_cat": [self.htm_id],
"photometry_ref_cat": [self.htm_id],
# outputs
"output_exposure": self.visit_id,
"exposure": self.visit_id,
"stars": self.visit_id,
"stars_footprints": self.visit_id,
"background": self.visit_id,
Expand All @@ -498,7 +509,7 @@ def test_runQuantum(self):
self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707")
self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110")
# Check that the proper kwargs are passed to run().
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "id_generator"})
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result", "id_generator"})

def test_runQuantum_2_snaps(self):
task = CalibrateImageTask()
Expand All @@ -510,7 +521,7 @@ def test_runQuantum_2_snaps(self):
"astrometry_ref_cat": [self.htm_id],
"photometry_ref_cat": [self.htm_id],
# outputs
"output_exposure": self.visit_id,
"exposure": self.visit_id,
"stars": self.visit_id,
"stars_footprints": self.visit_id,
"background": self.visit_id,
Expand All @@ -527,7 +538,7 @@ def test_runQuantum_2_snaps(self):
self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707")
self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110")
# Check that the proper kwargs are passed to run().
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "id_generator"})
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result", "id_generator"})

def test_runQuantum_no_optional_outputs(self):
config = CalibrateImageTask.ConfigClass()
Expand All @@ -541,7 +552,7 @@ def test_runQuantum_no_optional_outputs(self):
"astrometry_ref_cat": [self.htm_id],
"photometry_ref_cat": [self.htm_id],
# outputs
"output_exposure": self.visit_id,
"exposure": self.visit_id,
"stars": self.visit_id,
"stars_footprints": self.visit_id,
"applied_photo_calib": self.visit_id,
Expand All @@ -553,14 +564,88 @@ def test_runQuantum_no_optional_outputs(self):
self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707")
self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110")
# Check that the proper kwargs are passed to run().
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "id_generator"})
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result", "id_generator"})

def test_lintConnections(self):
"""Check that the connections are self-consistent.
"""
Connections = CalibrateImageTask.ConfigClass.ConnectionsClass
lsst.pipe.base.testUtils.lintConnections(Connections)

def test_runQuantum_exception(self):
"""Test exception handling in runQuantum.
"""
task = CalibrateImageTask()
lsst.pipe.base.testUtils.assertValidInitOutput(task)

quantum = lsst.pipe.base.testUtils.makeQuantum(
task, self.butler, self.visit_id,
{"exposures": [self.exposure0_id],
"astrometry_ref_cat": [self.htm_id],
"photometry_ref_cat": [self.htm_id],
# outputs
"exposure": self.visit_id,
"stars": self.visit_id,
"stars_footprints": self.visit_id,
"background": self.visit_id,
"psf_stars": self.visit_id,
"psf_stars_footprints": self.visit_id,
"applied_photo_calib": self.visit_id,
"initial_pvi_background": self.visit_id,
"astrometry_matches": self.visit_id,
"photometry_matches": self.visit_id,
})

# A generic exception should raise directly.
msg = "mocked run exception"
with (
mock.patch.object(task, "run", side_effect=ValueError(msg)),
self.assertRaisesRegex(ValueError, "mocked run exception")
):
lsst.pipe.base.testUtils.runTestQuantum(task, self.butler, quantum, mockRun=False)

# A AlgorimthError should write annotated partial outputs.
error = lsst.meas.algorithms.MeasureApCorrError(name="test", nSources=100, ndof=101)

def mock_run(exposures, result=None, id_generator=None):
"""Mock success through compute_psf, but failure after.
"""
result.exposure = afwImage.ExposureF(10, 10)
result.psf_stars_footprints = afwTable.SourceCatalog()
result.psf_stars = afwTable.SourceCatalog().asAstropy()
result.background = afwMath.BackgroundList()
raise error

with (
mock.patch.object(task, "run", side_effect=mock_run),
self.assertRaises(lsst.pipe.base.AnnotatedPartialOutputsError),
lsst.log.UsePythonLogging(), # so that assertLogs works with lsst.log
):
with self.assertLogs("lsst.calibrateImage", level="ERROR") as cm:
lsst.pipe.base.testUtils.runTestQuantum(task,
self.butler,
quantum,
mockRun=False)

logged = "\n".join(cm.output)
self.assertIn("Task failed with only partial outputs", logged)
self.assertIn("MeasureApCorrError", logged)

# NOTE: This is an integration test of afw Exposure & SourceCatalog
# metadata with the error annotation system in pipe_base.
# Check that we did get the annotated partial outputs...
pvi = self.butler.get("initial_pvi", self.visit_id)
self.assertIn("Unable to measure aperture correction", pvi.metadata["failure.message"])
self.assertIn("MeasureApCorrError", pvi.metadata["failure.type"])
self.assertEqual(pvi.metadata["failure.metadata.ndof"], 101)
stars = self.butler.get("initial_psf_stars_footprints_detector", self.visit_id)
self.assertIn("Unable to measure aperture correction", stars.metadata["failure.message"])
self.assertIn("MeasureApCorrError", stars.metadata["failure.type"])
self.assertEqual(stars.metadata["failure.metadata.ndof"], 101)
# ... but not the un-produced outputs.
with self.assertRaises(FileNotFoundError):
self.butler.get("initial_stars_footprints_detector", self.visit_id)


def setup_module(module):
lsst.utils.tests.init()
Expand Down

0 comments on commit 88ce0b6

Please sign in to comment.