From 95fe125feffddbe7e317e6ad06d74190e6392f14 Mon Sep 17 00:00:00 2001 From: Merlin Fisher-Levine Date: Fri, 15 Mar 2024 11:45:45 -0700 Subject: [PATCH] Working, but needs tidying - Jim, help! --- python/lsst/ctrl/mpexec/singleQuantumExecutor.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index c7dbad9d..418a882f 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -312,10 +312,6 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle RuntimeError Raised if some outputs exist and some not. """ - if not self.butler: - # Skip/prune logic only works for full butler. - return False - if self.skipExisting: _LOG.debug( "Checking existence of metadata from previous execution of label=%s dataId=%s.", @@ -333,7 +329,7 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle _LOG.debug( "Looking for existing outputs in the way for label=%s dataId=%s.", taskDef.label, quantum.dataId ) - ref_dict = self.butler.stored_many(chain.from_iterable(quantum.outputs.values())) + ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values())) existingRefs = [ref for ref, exists in ref_dict.items() if exists] missingRefs = [ref for ref, exists in ref_dict.items() if not exists] if existingRefs: @@ -343,26 +339,28 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle return True elif self.clobberOutputs: _LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs) - self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) + limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) else: + run = self.butler.run if hasattr(self.butler, 'run') else 'run is n/a on a limited butler' raise RuntimeError( f"Complete outputs exists for a quantum {quantum} " "and neither clobberOutputs nor skipExisting is set: " - f"collection={self.butler.run} existingRefs={existingRefs}" + f"collection={run} existingRefs={existingRefs}" ) else: # Partial outputs from a failed quantum. + run = self.butler.run if hasattr(self.butler, 'run') else 'run is n/a on a limited butler' _LOG.debug( "Partial outputs exist for quantum %s collection=%s existingRefs=%s missingRefs=%s", quantum, - self.butler.run, + run, existingRefs, missingRefs, ) if self.clobberOutputs: # only prune _LOG.info("Removing partial outputs for task %s: %s", taskDef, existingRefs) - self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) + limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True) return False else: raise RuntimeError(