From 93c41a8c52d4fc1f845cdb937a5d701cb58ea79c Mon Sep 17 00:00:00 2001 From: John Parejko Date: Thu, 29 Jun 2023 16:58:19 -0700 Subject: [PATCH] Configurable exception handling in CalibrateImageTask --- python/lsst/pipe/tasks/calibrateImage.py | 61 +++++++++++++++++---- tests/test_calibrateImage.py | 68 ++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 11 deletions(-) diff --git a/python/lsst/pipe/tasks/calibrateImage.py b/python/lsst/pipe/tasks/calibrateImage.py index 4275453642..cb73adb9f5 100644 --- a/python/lsst/pipe/tasks/calibrateImage.py +++ b/python/lsst/pipe/tasks/calibrateImage.py @@ -72,8 +72,8 @@ class CalibrateImageConnections(pipeBase.PipelineTaskConnections, storageClass="SourceCatalog", ) - # TODO: We want some kind of flag on Exposures/Catalogs to make it obvious - # which components had failed to be computed/persisted + # TODO DM-38732: We want some kind of flag on Exposures/Catalogs to make + # it obvious which components had failed to be computed/persisted. output_exposure = connectionTypes.Output( doc="Photometrically calibrated exposure with fitted calibrations and summary statistics.", name="initial_pvi", @@ -135,6 +135,18 @@ def __init__(self, *, config=None): class CalibrateImageConfig(pipeBase.PipelineTaskConfig, pipelineConnections=CalibrateImageConnections): + # TODO: We want some kind of flag on Exposures/Catalogs to make it obvious + # which components had failed to be computed/persisted + on_exception = pexConfig.ChoiceField( + doc="How to handle exceptions in subtasks.", + dtype=str, + allowed={"raise": "Immediately raise exceptions, without writing any output.", + "write_and_raise": "Write as much output as possible, and then re-raise.", + "write_and_log": "Write as much output as possible and log the exception to `log.error`.", + }, + default="write_and_raise" + ) + optional_outputs = pexConfig.ListField( doc="Which optional outputs to save (as their connection name)?", dtype=str, @@ -393,6 +405,9 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): butlerQC.put(outputs, outputRefs) + if "raise" in self.config.on_exception and isinstance(outputs.exception, BaseException): + raise outputs.exception + @timeMethod def run(self, *, exposure): """Find stars and perform psf measurement, then do a deeper detection @@ -432,20 +447,44 @@ def run(self, *, exposure): Reference catalog stars matches used in the photometric fit. (`list` [`lsst.afw.table.ReferenceMatch`] or `lsst.afw.table.BaseCatalog`) """ - psf_stars, background, candidates = self._compute_psf(exposure) + try: + psf_stars, background, candidates = self._compute_psf(exposure) + + self._measure_aperture_correction(exposure, psf_stars) + + stars = self._find_stars(exposure, background) + + astrometry_matches, astrometry_meta = self._fit_astrometry(exposure, stars) + stars, photometry_matches, photometry_meta, photo_calib = self._fit_photometry(exposure, stars) + + self._summarize(exposure, stars, background) - self._measure_aperture_correction(exposure, psf_stars) + if self.config.optional_outputs: + astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches, astrometry_meta) + photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches, photometry_meta) + except Exception as e: + if self.config.on_exception == "raise": + raise - stars = self._find_stars(exposure, background) + result = pipeBase.Struct(output_exposure=exposure) + result.stars = locals().get("stars", None) + result.psf_stars = locals().get("psf_stars", None) + result.background = locals().get("background", None) + result.astrometry_matches = locals().get("astrometry_matches", None) + result.photometry_matches = locals().get("photometry_matches", None) + result.applied_photo_calib = locals().get("photo_calib", None) - astrometry_matches, astrometry_meta = self._fit_astrometry(exposure, stars) - stars, photometry_matches, photometry_meta, photo_calib = self._fit_photometry(exposure, stars) + # TODO: do we need to call _summarize here? Have to be careful if + # `stars` doesn't exist... + self._summarize(exposure, result.stars, result.background) - self._summarize(exposure, stars, background) + if "log" in self.config.on_exception: + # self.log.error("Exception in CalibrateImage.run: %s",) + self.log.exception("Error in CalibrateImage; writing available datasets: %s", + ", ".join(vars(result).keys())) - if self.config.optional_outputs: - astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches, astrometry_meta) - photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches, photometry_meta) + result.exception = e + return result return pipeBase.Struct(output_exposure=exposure, stars=stars, diff --git a/tests/test_calibrateImage.py b/tests/test_calibrateImage.py index 9fca3c9c3b..7c55d7dcb8 100644 --- a/tests/test_calibrateImage.py +++ b/tests/test_calibrateImage.py @@ -20,6 +20,7 @@ # along with this program. If not, see . import unittest +from unittest import mock import tempfile import astropy.units as u @@ -97,6 +98,7 @@ def setUp(self): # Test-specific configuration: self.config = CalibrateImageTask.ConfigClass() + self.config.on_exception = "raise" # fail immediately on all errors # We don't have many sources, so have to fit simpler models. self.config.psf_detection.background.approxOrderX = 1 self.config.star_detection.background.approxOrderX = 1 @@ -262,6 +264,72 @@ def test_photometry(self): self.assertFloatsAlmostEqual(stars['slot_PsfFlux_flux'], self.truth_cat['truth_flux'][idx], rtol=0.1) self.assertFloatsAlmostEqual(stars['slot_PsfFlux_mag'], self.truth_cat['truth_mag'][idx], rtol=0.01) + def test_exceptions_raise(self): + """Check that the task raises immediately on errors. + """ + self.config.on_exception = "raise" + calibrate = CalibrateImageTask(config=self.config) + calibrate.astrometry.setRefObjLoader(self.ref_loader) + calibrate.photometry.match.setRefObjLoader(self.ref_loader) + with mock.patch.object(calibrate, "psf_detection", + spec=lsst.meas.algorithms.SourceDetectionTask) as patch: + msg = "mocked detection exception" + patch.run.side_effect = ValueError(msg) + # Should only raise the inner exception, no other. + with self.assertRaisesRegex(ValueError, msg): + calibrate.run(exposure=self.exposure) + # TODO: check that the exception has information about where it was raised in the payload. + + def test_exceptions_write_and_raise(self): + """Check that the task returns some results and the exception. + """ + self.config.on_exception = "write_and_raise" + calibrate = CalibrateImageTask(config=self.config) + calibrate.astrometry.setRefObjLoader(self.ref_loader) + calibrate.photometry.match.setRefObjLoader(self.ref_loader) + with mock.patch.object(calibrate, "star_deblend", + spec=lsst.meas.deblender.SourceDeblendTask) as patch: + msg = "mocked deblend exception" + patch.run.side_effect = ValueError(msg) + result = calibrate.run(exposure=self.exposure) + self.assertIsNotNone(result.output_exposure) + self.assertIsNotNone(result.psf_stars) + self.assertIsNotNone(result.background) + self.assertIsNone(result.stars) + self.assertIsNone(result.astrometry_matches) + self.assertIsNone(result.photometry_matches) + self.assertIsNone(result.applied_photo_calib) + self.assertIsInstance(result.exception, ValueError) + self.assertEqual(result.exception.args, (msg, )) + + def test_exceptions_write_and_log(self): + """Check that the task logs the exception appropriately and also + returns some results and the exception. + """ + # Return some output and a log message with no exception. + self.config.on_exception = "write_and_log" + calibrate = CalibrateImageTask(config=self.config) + calibrate.astrometry.setRefObjLoader(self.ref_loader) + calibrate.photometry.match.setRefObjLoader(self.ref_loader) + with mock.patch.object(calibrate, "astrometry", + spec=lsst.meas.astrom.AstrometryTask) as patch: + msg = "mocked astrometry exception" + patch.run.side_effect = ValueError(msg) + with lsst.log.UsePythonLogging(): # so that assertLogs works with lsst.log + with self.assertLogs("lsst.calibrateImage", level="ERROR") as cm: + result = calibrate.run(exposure=self.exposure) + self.assertIn(msg, "\n".join(cm.output)) + self.assertIn("writing available datasets", "\n".join(cm.output)) + self.assertIsNotNone(result.output_exposure) + self.assertIsNotNone(result.psf_stars) + self.assertIsNotNone(result.background) + self.assertIsNotNone(result.stars) + self.assertIsNone(result.astrometry_matches) + self.assertIsNone(result.photometry_matches) + self.assertIsNone(result.applied_photo_calib) + self.assertIsInstance(result.exception, ValueError) + self.assertEqual(result.exception.args, (msg, )) + class CalibrateImageTaskRunQuantumTests(lsst.utils.tests.TestCase): """Tests of ``CalibrateImageTask.runQuantum``, which need a test butler,