Skip to content

Commit

Permalink
Handle exceptions in CalibrateImage
Browse files Browse the repository at this point in the history
Use the new Task exceptions and a pre-defined results struct to manage partial
outputs for failures of CalibrateImage subtasks.
  • Loading branch information
parejkoj committed Feb 8, 2024
1 parent 410a08b commit acb9fdc
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 26 deletions.
72 changes: 50 additions & 22 deletions python/lsst/pipe/tasks/calibrateImage.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class CalibrateImageConnections(pipeBase.PipelineTaskConnections,
storageClass="SourceCatalog",
)

# TODO: We want some kind of flag on Exposures/Catalogs to make it obvious
# which components had failed to be computed/persisted
# TODO DM-38732: We want some kind of flag on Exposures/Catalogs to make
# it obvious which components had failed to be computed/persisted.
output_exposure = connectionTypes.Output(
doc="Photometrically calibrated exposure with fitted calibrations and summary statistics.",
name="initial_pvi",
Expand Down Expand Up @@ -398,6 +398,7 @@ def __init__(self, initial_stars_schema=None, **kwargs):

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
exposures = inputs.pop("exposures")

astrometry_loader = lsst.meas.algorithms.ReferenceObjectLoader(
dataIds=[ref.datasetRef.dataId for ref in inputRefs.astrometry_ref_cat],
Expand All @@ -413,12 +414,31 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
config=self.config.photometry_ref_loader, log=self.log)
self.photometry.match.setRefObjLoader(photometry_loader)

outputs = self.run(**inputs)

butlerQC.put(outputs, outputRefs)
# This should not happen with a properly configured execution context.
assert not inputs, "runQuantum got more inputs than expected"

result = pipeBase.Struct(exposure=None,
stars=None,
background=None,
psf_stars=None,
applied_photo_calib=None,
astrometry_matches=None,
photometry_matches=None,
)
try:
self.run(exposures=exposures, result=result)
except lsst.pipe.base.RepeatableQuantumError as e:
error = pipeBase.AnnotatedPartialOutputsError.annotate(
e,
exposures=[result.exposure],
catalogs=[result.psf_stars, result.stars],
task=self
)
butlerQC.put(result, outputRefs)
raise error from e

@timeMethod
def run(self, *, exposures):
def run(self, *, exposures, result=None):
"""Find stars and perform psf measurement, then do a deeper detection
and measurement and calibrate astrometry and photometry from that.
Expand Down Expand Up @@ -458,30 +478,38 @@ def run(self, *, exposures):
Reference catalog stars matches used in the photometric fit.
(`list` [`lsst.afw.table.ReferenceMatch`] or `lsst.afw.table.BaseCatalog`)
"""
exposure = self._handle_snaps(exposures)

psf_stars, background, candidates = self._compute_psf(exposure)
if result is None:
result = pipeBase.Struct(output_exposure=None,
stars=None,
background=None,
psf_stars=None,
applied_photo_calib=None,
astrometry_matches=None,
photometry_matches=None,
)

self._measure_aperture_correction(exposure, psf_stars)
result.output_exposure = self._handle_snaps(exposures)

stars = self._find_stars(exposure, background)
result.psf_stars, result.background, candidates = self._compute_psf(result.output_exposure)

astrometry_matches, astrometry_meta = self._fit_astrometry(exposure, stars)
stars, photometry_matches, photometry_meta, photo_calib = self._fit_photometry(exposure, stars)
self._measure_aperture_correction(result.output_exposure, result.psf_stars)

self._summarize(exposure, stars, background)
stars = self._find_stars(result.output_exposure, result.background)
result.stars = stars

astrometry_matches, astrometry_meta = self._fit_astrometry(result.output_exposure, stars)
if self.config.optional_outputs:
result.astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches,
astrometry_meta)
result.stars, photometry_matches, \
photometry_meta, result.applied_photo_calib = self._fit_photometry(result.output_exposure,
result.stars)
if self.config.optional_outputs:
astrometry_matches = lsst.meas.astrom.denormalizeMatches(astrometry_matches, astrometry_meta)
photometry_matches = lsst.meas.astrom.denormalizeMatches(photometry_matches, photometry_meta)

return pipeBase.Struct(output_exposure=exposure,
stars=stars,
psf_stars=psf_stars,
background=background,
applied_photo_calib=photo_calib,
astrometry_matches=astrometry_matches,
photometry_matches=photometry_matches)
self._summarize(result.output_exposure, stars, result.background)

return result

def _handle_snaps(self, exposure):
"""Combine two snaps into one exposure, or return a single exposure.
Expand Down
75 changes: 71 additions & 4 deletions tests/test_calibrateImage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import unittest
from unittest import mock
import tempfile

import astropy.units as u
Expand Down Expand Up @@ -139,7 +140,7 @@ def _check_run(self, calibrate, result, *, photo_calib):
# Returned photoCalib should be the applied value, not the ==1 one on the exposure.
self.assertFloatsAlmostEqual(result.applied_photo_calib.getCalibrationMean(),
photo_calib, rtol=2e-3)
# Should have flux/magnitudes in the catalog.
# Should have calibrated flux/magnitudes in the catalog.
self.assertIn("slot_PsfFlux_flux", result.stars.schema)
self.assertIn("slot_PsfFlux_mag", result.stars.schema)

Expand Down Expand Up @@ -322,6 +323,72 @@ def test_match_psf_stars(self):
# Too few sources to reserve any in these tests.
self.assertEqual(stars["calib_psf_reserved"].sum(), 0)

def test_exceptions_raise(self):
"""Check that the task raises immediately on errors.
"""
self.config.on_exception = "raise"
calibrate = CalibrateImageTask(config=self.config)
calibrate.astrometry.setRefObjLoader(self.ref_loader)
calibrate.photometry.match.setRefObjLoader(self.ref_loader)
with mock.patch.object(calibrate, "psf_detection",
spec=lsst.meas.algorithms.SourceDetectionTask) as patch:
msg = "mocked detection exception"
patch.run.side_effect = ValueError(msg)
# Should only raise the inner exception, no other.
with self.assertRaisesRegex(ValueError, msg):
calibrate.run(exposure=self.exposure)
# TODO: check that the exception has information about where it was raised in the payload.

def test_exceptions_write_and_raise(self):
"""Check that the task returns some results and the exception.
"""
self.config.on_exception = "write_and_raise"
calibrate = CalibrateImageTask(config=self.config)
calibrate.astrometry.setRefObjLoader(self.ref_loader)
calibrate.photometry.match.setRefObjLoader(self.ref_loader)
with mock.patch.object(calibrate, "star_deblend",
spec=lsst.meas.deblender.SourceDeblendTask) as patch:
msg = "mocked deblend exception"
patch.run.side_effect = ValueError(msg)
result = calibrate.run(exposure=self.exposure)
self.assertIsNotNone(result.output_exposure)
self.assertIsNotNone(result.psf_stars)
self.assertIsNotNone(result.background)
self.assertIsNone(result.stars)
self.assertIsNone(result.astrometry_matches)
self.assertIsNone(result.photometry_matches)
self.assertIsNone(result.applied_photo_calib)
self.assertIsInstance(result.exception, ValueError)
self.assertEqual(result.exception.args, (msg, ))

def test_exceptions_write_and_log(self):
"""Check that the task logs the exception appropriately and also
returns some results and the exception.
"""
# Return some output and a log message with no exception.
self.config.on_exception = "write_and_log"
calibrate = CalibrateImageTask(config=self.config)
calibrate.astrometry.setRefObjLoader(self.ref_loader)
calibrate.photometry.match.setRefObjLoader(self.ref_loader)
with mock.patch.object(calibrate, "astrometry",
spec=lsst.meas.astrom.AstrometryTask) as patch:
msg = "mocked astrometry exception"
patch.run.side_effect = ValueError(msg)
with lsst.log.UsePythonLogging(): # so that assertLogs works with lsst.log
with self.assertLogs("lsst.calibrateImage", level="ERROR") as cm:
result = calibrate.run(exposure=self.exposure)
self.assertIn(msg, "\n".join(cm.output))
self.assertIn("writing available datasets", "\n".join(cm.output))
self.assertIsNotNone(result.output_exposure)
self.assertIsNotNone(result.psf_stars)
self.assertIsNotNone(result.background)
self.assertIsNotNone(result.stars)
self.assertIsNone(result.astrometry_matches)
self.assertIsNone(result.photometry_matches)
self.assertIsNone(result.applied_photo_calib)
self.assertIsInstance(result.exception, ValueError)
self.assertEqual(result.exception.args, (msg, ))


class CalibrateImageTaskRunQuantumTests(lsst.utils.tests.TestCase):
"""Tests of ``CalibrateImageTask.runQuantum``, which need a test butler,
Expand Down Expand Up @@ -418,7 +485,7 @@ def test_runQuantum(self):
self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707")
self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110")
# Check that the proper kwargs are passed to run().
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures"})
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result"})

def test_runQuantum_2_snaps(self):
task = CalibrateImageTask()
Expand All @@ -445,7 +512,7 @@ def test_runQuantum_2_snaps(self):
self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707")
self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110")
# Check that the proper kwargs are passed to run().
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures"})
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result"})

def test_runQuantum_no_optional_outputs(self):
config = CalibrateImageTask.ConfigClass()
Expand All @@ -470,7 +537,7 @@ def test_runQuantum_no_optional_outputs(self):
self.assertEqual(task.astrometry.refObjLoader.name, "gaia_dr3_20230707")
self.assertEqual(task.photometry.match.refObjLoader.name, "ps1_pv3_3pi_20170110")
# Check that the proper kwargs are passed to run().
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures"})
self.assertEqual(mock_run.call_args.kwargs.keys(), {"exposures", "result"})

def test_lintConnections(self):
"""Check that the connections are self-consistent.
Expand Down

0 comments on commit acb9fdc

Please sign in to comment.