From 7020ca43c47b7b6d1b8970448ee1c03d95b69cda Mon Sep 17 00:00:00 2001 From: John Parejko Date: Thu, 29 Jun 2023 16:58:19 -0700 Subject: [PATCH] Handle exceptions in CalibrateImage Use the new Task exceptions and a pre-defined results struct to manage partial outputs for failures of CalibrateImage subtasks. Add tests of runQuantum exception handling. The input is now a multiple `exposures`, so we can use that to simplify the code by renaming output_exposure->exposure. --- python/lsst/pipe/tasks/calibrateImage.py | 91 ++++++++++++++------- tests/test_calibrateImage.py | 100 +++++++++++++++++++++-- 2 files changed, 153 insertions(+), 38 deletions(-) diff --git a/python/lsst/pipe/tasks/calibrateImage.py b/python/lsst/pipe/tasks/calibrateImage.py index 75d26dbc8..5d0492f0f 100644 --- a/python/lsst/pipe/tasks/calibrateImage.py +++ b/python/lsst/pipe/tasks/calibrateImage.py @@ -76,9 +76,9 @@ class CalibrateImageConnections(pipeBase.PipelineTaskConnections, storageClass="SourceCatalog", ) - # TODO: We want some kind of flag on Exposures/Catalogs to make it obvious - # which components had failed to be computed/persisted - output_exposure = connectionTypes.Output( + # TODO DM-38732: We want some kind of flag on Exposures/Catalogs to make + # it obvious which components had failed to be computed/persisted. + exposure = connectionTypes.Output( doc="Photometrically calibrated exposure with fitted calibrations and summary statistics.", name="initial_pvi", storageClass="ExposureF", @@ -408,6 +408,8 @@ def __init__(self, initial_stars_schema=None, **kwargs): def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) + exposures = inputs.pop("exposures") + id_generator = self.config.id_generator.apply(butlerQC.quantum.dataId) astrometry_loader = lsst.meas.algorithms.ReferenceObjectLoader( @@ -424,12 +426,33 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): config=self.config.photometry_ref_loader, log=self.log) self.photometry.match.setRefObjLoader(photometry_loader) - outputs = self.run(id_generator=id_generator, **inputs) - - butlerQC.put(outputs, outputRefs) + # This should not happen with a properly configured execution context. + assert not inputs, "runQuantum got more inputs than expected" + + # Specify the fields that `annotate` needs below, to ensure they + # exist, even as None. + result = pipeBase.Struct(exposure=None, + stars_footprints=None, + psf_stars_footprints=None, + ) + try: + self.run(exposures=exposures, result=result, id_generator=id_generator) + except pipeBase.AlgorithmError as e: + error = pipeBase.AnnotatedPartialOutputsError.annotate( + e, + self, + result.exposure, + result.psf_stars_footprints, + result.stars_footprints, + log=self.log + ) + butlerQC.put(result, outputRefs) + raise error from e + + butlerQC.put(result, outputRefs) @timeMethod - def run(self, *, exposures, id_generator=None): + def run(self, *, exposures, id_generator=None, result=None): """Find stars and perform psf measurement, then do a deeper detection and measurement and calibrate astrometry and photometry from that. @@ -442,13 +465,17 @@ def run(self, *, exposures, id_generator=None): before doing further processing. id_generator : `lsst.meas.base.IdGenerator`, optional Object that generates source IDs and provides random seeds. + result : `lsst.pipe.base.Struct`, optional + Result struct that is modified to allow saving of partial outputs + for some failure conditions. If the task completes successfully, + this is also returned. Returns ------- result : `lsst.pipe.base.Struct` Results as a struct with attributes: - ``output_exposure`` + ``exposure`` Calibrated exposure, with pixels in nJy units. (`lsst.afw.image.Exposure`) ``stars`` @@ -477,40 +504,44 @@ def run(self, *, exposures, id_generator=None): Reference catalog stars matches used in the photometric fit. (`list` [`lsst.afw.table.ReferenceMatch`] or `lsst.afw.table.BaseCatalog`) """ + if result is None: + result = pipeBase.Struct() if id_generator is None: id_generator = lsst.meas.base.IdGenerator() - exposure = self._handle_snaps(exposures) + result.exposure = self._handle_snaps(exposures) # TODO remove on DM-43083: work around the fact that we don't want # to run streak detection in this task in production. - exposure.mask.addMaskPlane("STREAK") + result.exposure.mask.addMaskPlane("STREAK") - psf_stars, background, candidates = self._compute_psf(exposure, id_generator) + result.psf_stars_footprints, result.background, candidates = self._compute_psf(result.exposure, + id_generator) + result.psf_stars = result.psf_stars_footprints.asAstropy() - self._measure_aperture_correction(exposure, psf_stars) + self._measure_aperture_correction(result.exposure, result.psf_stars) - stars = self._find_stars(exposure, background, id_generator) - self._match_psf_stars(psf_stars, stars) + result.stars_footprints = self._find_stars(result.exposure, result.background, id_generator) + self._match_psf_stars(result.psf_stars_footprints, result.stars_footprints) + result.stars = result.stars_footprints.asAstropy() - astrometry_matches, astrometry_meta = self._fit_astrometry(exposure, stars) - stars, photometry_matches, photometry_meta, photo_calib = self._fit_photometry(exposure, stars) + astrometry_matches, astrometry_meta = self._fit_astrometry(result.exposure, result.stars_footprints) + if self.config.optional_outputs: + result.astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches, + astrometry_meta) + + result.stars_footprints, photometry_matches, \ + photometry_meta, result.applied_photo_calib = self._fit_photometry(result.exposure, + result.stars_footprints) + # fit_photometry returns a new catalog, so we need a new astropy table view. + result.stars = result.stars_footprints.asAstropy() + if self.config.optional_outputs: + result.photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches, + photometry_meta) - self._summarize(exposure, stars, background) + self._summarize(result.exposure, result.stars_footprints, result.background) - if self.config.optional_outputs: - astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches, astrometry_meta) - photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches, photometry_meta) - - return pipeBase.Struct(output_exposure=exposure, - stars_footprints=stars, - stars=stars.asAstropy(), - psf_stars_footprints=psf_stars, - psf_stars=psf_stars.asAstropy(), - background=background, - applied_photo_calib=photo_calib, - astrometry_matches=astrometry_matches, - photometry_matches=photometry_matches) + return result def _handle_snaps(self, exposure): """Combine two snaps into one exposure, or return a single exposure. diff --git a/tests/test_calibrateImage.py b/tests/test_calibrateImage.py index 598dde798..6944a4beb 100644 --- a/tests/test_calibrateImage.py +++ b/tests/test_calibrateImage.py @@ -20,6 +20,7 @@ # along with this program. If not, see . import unittest +from unittest import mock import tempfile import astropy.units as u @@ -27,6 +28,7 @@ import numpy as np import lsst.afw.image as afwImage +import lsst.afw.math as afwMath import lsst.afw.table as afwTable import lsst.daf.base import lsst.daf.butler @@ -150,16 +152,24 @@ def _check_run(self, calibrate, result): # re-estimation during source detection. self.assertEqual(len(result.background), 4) + # Both afw and astropy psf_stars catalogs should be populated. + self.assertEqual(result.psf_stars["calib_psf_used"].sum(), 3) + self.assertEqual(result.psf_stars_footprints["calib_psf_used"].sum(), 3) + # Check that the summary statistics are reasonable. - summary = result.output_exposure.info.getSummaryStats() + summary = result.exposure.info.getSummaryStats() self.assertFloatsAlmostEqual(summary.psfSigma, 2.0, rtol=1e-2) self.assertFloatsAlmostEqual(summary.ra, self.sky_center.getRa().asDegrees(), rtol=1e-7) self.assertFloatsAlmostEqual(summary.dec, self.sky_center.getDec().asDegrees(), rtol=1e-7) + # Should have finite sky coordinates in the afw and astropy catalogs. + self.assertTrue(np.isfinite(result.stars_footprints["coord_ra"]).all()) + self.assertTrue(np.isfinite(result.stars["coord_ra"]).all()) + # Returned photoCalib should be the applied value, not the ==1 one on the exposure. self.assertFloatsAlmostEqual(result.applied_photo_calib.getCalibrationMean(), self.photo_calib, rtol=2e-3) - # Should have flux/magnitudes in the afw and astropy catalogs + # Should have calibrated flux/magnitudes in the afw and astropy catalogs self.assertIn("slot_PsfFlux_flux", result.stars_footprints.schema) self.assertIn("slot_PsfFlux_mag", result.stars_footprints.schema) self.assertEqual(result.stars["slot_PsfFlux_flux"].unit, u.nJy) @@ -481,7 +491,7 @@ def test_runQuantum(self): "astrometry_ref_cat": [self.htm_id], "photometry_ref_cat": [self.htm_id], # outputs - "output_exposure": self.visit_id, + "exposure": self.visit_id, "stars": self.visit_id, "stars_footprints": self.visit_id, "background": self.visit_id, @@ -498,7 +508,7 @@ def test_runQuantum(self): self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707") self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110") # Check that the proper kwargs are passed to run(). - self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "id_generator"}) + self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result", "id_generator"}) def test_runQuantum_2_snaps(self): task = CalibrateImageTask() @@ -510,7 +520,7 @@ def test_runQuantum_2_snaps(self): "astrometry_ref_cat": [self.htm_id], "photometry_ref_cat": [self.htm_id], # outputs - "output_exposure": self.visit_id, + "exposure": self.visit_id, "stars": self.visit_id, "stars_footprints": self.visit_id, "background": self.visit_id, @@ -527,7 +537,7 @@ def test_runQuantum_2_snaps(self): self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707") self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110") # Check that the proper kwargs are passed to run(). - self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "id_generator"}) + self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result", "id_generator"}) def test_runQuantum_no_optional_outputs(self): config = CalibrateImageTask.ConfigClass() @@ -541,7 +551,7 @@ def test_runQuantum_no_optional_outputs(self): "astrometry_ref_cat": [self.htm_id], "photometry_ref_cat": [self.htm_id], # outputs - "output_exposure": self.visit_id, + "exposure": self.visit_id, "stars": self.visit_id, "stars_footprints": self.visit_id, "applied_photo_calib": self.visit_id, @@ -553,7 +563,7 @@ def test_runQuantum_no_optional_outputs(self): self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707") self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110") # Check that the proper kwargs are passed to run(). - self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "id_generator"}) + self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result", "id_generator"}) def test_lintConnections(self): """Check that the connections are self-consistent. @@ -561,6 +571,80 @@ def test_lintConnections(self): Connections = CalibrateImageTask.ConfigClass.ConnectionsClass lsst.pipe.base.testUtils.lintConnections(Connections) + def test_runQuantum_exception(self): + """Test exception handling in runQuantum. + """ + task = CalibrateImageTask() + lsst.pipe.base.testUtils.assertValidInitOutput(task) + + quantum = lsst.pipe.base.testUtils.makeQuantum( + task, self.butler, self.visit_id, + {"exposures": [self.exposure0_id], + "astrometry_ref_cat": [self.htm_id], + "photometry_ref_cat": [self.htm_id], + # outputs + "exposure": self.visit_id, + "stars": self.visit_id, + "stars_footprints": self.visit_id, + "background": self.visit_id, + "psf_stars": self.visit_id, + "psf_stars_footprints": self.visit_id, + "applied_photo_calib": self.visit_id, + "initial_pvi_background": self.visit_id, + "astrometry_matches": self.visit_id, + "photometry_matches": self.visit_id, + }) + + # A generic exception should raise directly. + msg = "mocked run exception" + with ( + mock.patch.object(task, "run", side_effect=ValueError(msg)), + self.assertRaisesRegex(ValueError, "mocked run exception") + ): + lsst.pipe.base.testUtils.runTestQuantum(task, self.butler, quantum, mockRun=False) + + # A AlgorimthError should write annotated partial outputs. + error = lsst.meas.algorithms.MeasureApCorrError(name="test", nSources=100, ndof=101) + + def mock_run(exposures, result=None, id_generator=None): + """Mock success through compute_psf, but failure after. + """ + result.exposure = afwImage.ExposureF(10, 10) + result.psf_stars_footprints = afwTable.SourceCatalog() + result.psf_stars = afwTable.SourceCatalog().asAstropy() + result.background = afwMath.BackgroundList() + raise error + + with ( + mock.patch.object(task, "run", side_effect=mock_run), + self.assertRaises(lsst.pipe.base.AnnotatedPartialOutputsError), + lsst.log.UsePythonLogging(), # so that assertLogs works with lsst.log + ): + with self.assertLogs("lsst.calibrateImage", level="ERROR") as cm: + lsst.pipe.base.testUtils.runTestQuantum(task, + self.butler, + quantum, + mockRun=False) + + logged = "\n".join(cm.output) + self.assertIn("Task failed with only partial outputs", logged) + self.assertIn("MeasureApCorrError", logged) + + # NOTE: This is an integration test of afw Exposure & SourceCatalog + # metadata with the error annotation system in pipe_base. + # Check that we did get the annotated partial outputs... + pvi = self.butler.get("initial_pvi", self.visit_id) + self.assertIn("Unable to measure aperture correction", pvi.metadata["failure.message"]) + self.assertIn("MeasureApCorrError", pvi.metadata["failure.type"]) + self.assertEqual(pvi.metadata["failure.metadata.ndof"], 101) + stars = self.butler.get("initial_psf_stars_footprints_detector", self.visit_id) + self.assertIn("Unable to measure aperture correction", stars.metadata["failure.message"]) + self.assertIn("MeasureApCorrError", stars.metadata["failure.type"]) + self.assertEqual(stars.metadata["failure.metadata.ndof"], 101) + # ... but not the un-produced outputs. + with self.assertRaises(FileNotFoundError): + self.butler.get("initial_stars_footprints_detector", self.visit_id) + def setup_module(module): lsst.utils.tests.init()