From 3774a40c0bf6347c5f0faa07edfa8fb38cd40be9 Mon Sep 17 00:00:00 2001 From: Merlin Fisher-Levine Date: Fri, 15 Mar 2024 11:45:45 -0700 Subject: [PATCH] Update checkExistingOutputs to work with a LimitedButler --- .../lsst/ctrl/mpexec/singleQuantumExecutor.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index c7dbad9d..8edc88aa 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -290,6 +290,9 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle If only partial outputs exist then they are removed if ``clobberOutputs`` is True, otherwise an exception is raised. + The ``LimitedButler`` is used for everything, and should be set to + ``self.butler`` if no separate ``LimitedButler`` is available. + Parameters ---------- quantum : `~lsst.daf.butler.Quantum` @@ -312,10 +315,6 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle RuntimeError Raised if some outputs exist and some not. """ - if not self.butler: - # Skip/prune logic only works for full butler. - return False - if self.skipExisting: _LOG.debug( "Checking existence of metadata from previous execution of label=%s dataId=%s.", @@ -333,7 +332,7 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle _LOG.debug( "Looking for existing outputs in the way for label=%s dataId=%s.", taskDef.label, quantum.dataId ) - ref_dict = self.butler.stored_many(chain.from_iterable(quantum.outputs.values())) + ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values())) existingRefs = [ref for ref, exists in ref_dict.items() if exists] missingRefs = [ref for ref, exists in ref_dict.items() if not exists] if existingRefs: @@ -343,26 +342,28 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle return True elif self.clobberOutputs: _LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs) - self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) + limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) else: + run = self.butler.run if hasattr(self.butler, 'run') else 'run is n/a on a limited butler' raise RuntimeError( f"Complete outputs exists for a quantum {quantum} " "and neither clobberOutputs nor skipExisting is set: " - f"collection={self.butler.run} existingRefs={existingRefs}" + f"collection={run} existingRefs={existingRefs}" ) else: # Partial outputs from a failed quantum. + run = self.butler.run if hasattr(self.butler, 'run') else 'run is n/a on a limited butler' _LOG.debug( "Partial outputs exist for quantum %s collection=%s existingRefs=%s missingRefs=%s", quantum, - self.butler.run, + run, existingRefs, missingRefs, ) if self.clobberOutputs: # only prune _LOG.info("Removing partial outputs for task %s: %s", taskDef, existingRefs) - self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) + limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) return False else: raise RuntimeError(