Skip to content

Commit

Permalink
Working, but needs tidying - Jim, help!
Browse files Browse the repository at this point in the history
  • Loading branch information
mfisherlevine committed Mar 15, 2024
1 parent a2f6597 commit 95fe125
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,6 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
RuntimeError
Raised if some outputs exist and some not.
"""
if not self.butler:
# Skip/prune logic only works for full butler.
return False

if self.skipExisting:
_LOG.debug(
"Checking existence of metadata from previous execution of label=%s dataId=%s.",
Expand All @@ -333,7 +329,7 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
_LOG.debug(
"Looking for existing outputs in the way for label=%s dataId=%s.", taskDef.label, quantum.dataId
)
ref_dict = self.butler.stored_many(chain.from_iterable(quantum.outputs.values()))
ref_dict = limited_butler.stored_many(chain.from_iterable(quantum.outputs.values()))
existingRefs = [ref for ref, exists in ref_dict.items() if exists]
missingRefs = [ref for ref, exists in ref_dict.items() if not exists]
if existingRefs:
Expand All @@ -343,26 +339,28 @@ def checkExistingOutputs(self, quantum: Quantum, taskDef: TaskDef, limited_butle
return True
elif self.clobberOutputs:
_LOG.info("Removing complete outputs for quantum %s: %s", quantum, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
else:
run = self.butler.run if hasattr(self.butler, 'run') else 'run is n/a on a limited butler'
raise RuntimeError(
f"Complete outputs exists for a quantum {quantum} "
"and neither clobberOutputs nor skipExisting is set: "
f"collection={self.butler.run} existingRefs={existingRefs}"
f"collection={run} existingRefs={existingRefs}"
)
else:
# Partial outputs from a failed quantum.
run = self.butler.run if hasattr(self.butler, 'run') else 'run is n/a on a limited butler'
_LOG.debug(
"Partial outputs exist for quantum %s collection=%s existingRefs=%s missingRefs=%s",
quantum,
self.butler.run,
run,
existingRefs,
missingRefs,
)
if self.clobberOutputs:
# only prune
_LOG.info("Removing partial outputs for task %s: %s", taskDef, existingRefs)
self.butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
limited_butler.pruneDatasets(existingRefs, disassociate=True, unstore=True, purge=True)
return False
else:
raise RuntimeError(
Expand Down

0 comments on commit 95fe125

Please sign in to comment.