From acb9fdc19f6b7a8f2c091c4961c42055e0df97e1 Mon Sep 17 00:00:00 2001 From: John Parejko Date: Thu, 29 Jun 2023 16:58:19 -0700 Subject: [PATCH] Handle exceptions in CalibrateImage Use the new Task exceptions and a pre-defined results struct to manage partial outputs for failures of CalibrateImage subtasks. --- python/lsst/pipe/tasks/calibrateImage.py | 72 ++++++++++++++++------- tests/test_calibrateImage.py | 75 ++++++++++++++++++++++-- 2 files changed, 121 insertions(+), 26 deletions(-) diff --git a/python/lsst/pipe/tasks/calibrateImage.py b/python/lsst/pipe/tasks/calibrateImage.py index ca20a19b68..74474e3e60 100644 --- a/python/lsst/pipe/tasks/calibrateImage.py +++ b/python/lsst/pipe/tasks/calibrateImage.py @@ -76,8 +76,8 @@ class CalibrateImageConnections(pipeBase.PipelineTaskConnections, storageClass="SourceCatalog", ) - # TODO: We want some kind of flag on Exposures/Catalogs to make it obvious - # which components had failed to be computed/persisted + # TODO DM-38732: We want some kind of flag on Exposures/Catalogs to make + # it obvious which components had failed to be computed/persisted. output_exposure = connectionTypes.Output( doc="Photometrically calibrated exposure with fitted calibrations and summary statistics.", name="initial_pvi", @@ -398,6 +398,7 @@ def __init__(self, initial_stars_schema=None, **kwargs): def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) + exposures = inputs.pop("exposures") astrometry_loader = lsst.meas.algorithms.ReferenceObjectLoader( dataIds=[ref.datasetRef.dataId for ref in inputRefs.astrometry_ref_cat], @@ -413,12 +414,31 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): config=self.config.photometry_ref_loader, log=self.log) self.photometry.match.setRefObjLoader(photometry_loader) - outputs = self.run(**inputs) - - butlerQC.put(outputs, outputRefs) + # This should not happen with a properly configured execution context. + assert not inputs, "runQuantum got more inputs than expected" + + result = pipeBase.Struct(exposure=None, + stars=None, + background=None, + psf_stars=None, + applied_photo_calib=None, + astrometry_matches=None, + photometry_matches=None, + ) + try: + self.run(exposures=exposures, result=result) + except lsst.pipe.base.RepeatableQuantumError as e: + error = pipeBase.AnnotatedPartialOutputsError.annotate( + e, + exposures=[result.exposure], + catalogs=[result.psf_stars, result.stars], + task=self + ) + butlerQC.put(result, outputRefs) + raise error from e @timeMethod - def run(self, *, exposures): + def run(self, *, exposures, result=None): """Find stars and perform psf measurement, then do a deeper detection and measurement and calibrate astrometry and photometry from that. @@ -458,30 +478,38 @@ def run(self, *, exposures): Reference catalog stars matches used in the photometric fit. (`list` [`lsst.afw.table.ReferenceMatch`] or `lsst.afw.table.BaseCatalog`) """ - exposure = self._handle_snaps(exposures) - - psf_stars, background, candidates = self._compute_psf(exposure) + if result is None: + result = pipeBase.Struct(output_exposure=None, + stars=None, + background=None, + psf_stars=None, + applied_photo_calib=None, + astrometry_matches=None, + photometry_matches=None, + ) - self._measure_aperture_correction(exposure, psf_stars) + result.output_exposure = self._handle_snaps(exposures) - stars = self._find_stars(exposure, background) + result.psf_stars, result.background, candidates = self._compute_psf(result.output_exposure) - astrometry_matches, astrometry_meta = self._fit_astrometry(exposure, stars) - stars, photometry_matches, photometry_meta, photo_calib = self._fit_photometry(exposure, stars) + self._measure_aperture_correction(result.output_exposure, result.psf_stars) - self._summarize(exposure, stars, background) + stars = self._find_stars(result.output_exposure, result.background) + result.stars = stars + astrometry_matches, astrometry_meta = self._fit_astrometry(result.output_exposure, stars) + if self.config.optional_outputs: + result.astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches, + astrometry_meta) + result.stars, photometry_matches, \ + photometry_meta, result.applied_photo_calib = self._fit_photometry(result.output_exposure, + result.stars) if self.config.optional_outputs: - astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches, astrometry_meta) photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches, photometry_meta) - return pipeBase.Struct(output_exposure=exposure, - stars=stars, - psf_stars=psf_stars, - background=background, - applied_photo_calib=photo_calib, - astrometry_matches=astrometry_matches, - photometry_matches=photometry_matches) + self._summarize(result.output_exposure, stars, result.background) + + return result def _handle_snaps(self, exposure): """Combine two snaps into one exposure, or return a single exposure. diff --git a/tests/test_calibrateImage.py b/tests/test_calibrateImage.py index a02ca3ed9c..5e17356f99 100644 --- a/tests/test_calibrateImage.py +++ b/tests/test_calibrateImage.py @@ -20,6 +20,7 @@ # along with this program. If not, see . import unittest +from unittest import mock import tempfile import astropy.units as u @@ -139,7 +140,7 @@ def _check_run(self, calibrate, result, *, photo_calib): # Returned photoCalib should be the applied value, not the ==1 one on the exposure. self.assertFloatsAlmostEqual(result.applied_photo_calib.getCalibrationMean(), photo_calib, rtol=2e-3) - # Should have flux/magnitudes in the catalog. + # Should have calibrated flux/magnitudes in the catalog. self.assertIn("slot_PsfFlux_flux", result.stars.schema) self.assertIn("slot_PsfFlux_mag", result.stars.schema) @@ -322,6 +323,72 @@ def test_match_psf_stars(self): # Too few sources to reserve any in these tests. self.assertEqual(stars["calib_psf_reserved"].sum(), 0) + def test_exceptions_raise(self): + """Check that the task raises immediately on errors. + """ + self.config.on_exception = "raise" + calibrate = CalibrateImageTask(config=self.config) + calibrate.astrometry.setRefObjLoader(self.ref_loader) + calibrate.photometry.match.setRefObjLoader(self.ref_loader) + with mock.patch.object(calibrate, "psf_detection", + spec=lsst.meas.algorithms.SourceDetectionTask) as patch: + msg = "mocked detection exception" + patch.run.side_effect = ValueError(msg) + # Should only raise the inner exception, no other. + with self.assertRaisesRegex(ValueError, msg): + calibrate.run(exposure=self.exposure) + # TODO: check that the exception has information about where it was raised in the payload. + + def test_exceptions_write_and_raise(self): + """Check that the task returns some results and the exception. + """ + self.config.on_exception = "write_and_raise" + calibrate = CalibrateImageTask(config=self.config) + calibrate.astrometry.setRefObjLoader(self.ref_loader) + calibrate.photometry.match.setRefObjLoader(self.ref_loader) + with mock.patch.object(calibrate, "star_deblend", + spec=lsst.meas.deblender.SourceDeblendTask) as patch: + msg = "mocked deblend exception" + patch.run.side_effect = ValueError(msg) + result = calibrate.run(exposure=self.exposure) + self.assertIsNotNone(result.output_exposure) + self.assertIsNotNone(result.psf_stars) + self.assertIsNotNone(result.background) + self.assertIsNone(result.stars) + self.assertIsNone(result.astrometry_matches) + self.assertIsNone(result.photometry_matches) + self.assertIsNone(result.applied_photo_calib) + self.assertIsInstance(result.exception, ValueError) + self.assertEqual(result.exception.args, (msg, )) + + def test_exceptions_write_and_log(self): + """Check that the task logs the exception appropriately and also + returns some results and the exception. + """ + # Return some output and a log message with no exception. + self.config.on_exception = "write_and_log" + calibrate = CalibrateImageTask(config=self.config) + calibrate.astrometry.setRefObjLoader(self.ref_loader) + calibrate.photometry.match.setRefObjLoader(self.ref_loader) + with mock.patch.object(calibrate, "astrometry", + spec=lsst.meas.astrom.AstrometryTask) as patch: + msg = "mocked astrometry exception" + patch.run.side_effect = ValueError(msg) + with lsst.log.UsePythonLogging(): # so that assertLogs works with lsst.log + with self.assertLogs("lsst.calibrateImage", level="ERROR") as cm: + result = calibrate.run(exposure=self.exposure) + self.assertIn(msg, "\n".join(cm.output)) + self.assertIn("writing available datasets", "\n".join(cm.output)) + self.assertIsNotNone(result.output_exposure) + self.assertIsNotNone(result.psf_stars) + self.assertIsNotNone(result.background) + self.assertIsNotNone(result.stars) + self.assertIsNone(result.astrometry_matches) + self.assertIsNone(result.photometry_matches) + self.assertIsNone(result.applied_photo_calib) + self.assertIsInstance(result.exception, ValueError) + self.assertEqual(result.exception.args, (msg, )) + class CalibrateImageTaskRunQuantumTests(lsst.utils.tests.TestCase): """Tests of ``CalibrateImageTask.runQuantum``, which need a test butler, @@ -418,7 +485,7 @@ def test_runQuantum(self): self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707") self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110") # Check that the proper kwargs are passed to run(). - self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures"}) + self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result"}) def test_runQuantum_2_snaps(self): task = CalibrateImageTask() @@ -445,7 +512,7 @@ def test_runQuantum_2_snaps(self): self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707") self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110") # Check that the proper kwargs are passed to run(). - self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures"}) + self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result"}) def test_runQuantum_no_optional_outputs(self): config = CalibrateImageTask.ConfigClass() @@ -470,7 +537,7 @@ def test_runQuantum_no_optional_outputs(self): self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707") self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110") # Check that the proper kwargs are passed to run(). - self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures"}) + self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result"}) def test_lintConnections(self): """Check that the connections are self-consistent.