Skip to content

Commit

Permalink
Merge pull request #318 from lsst/tickets/DM-40242
Browse files Browse the repository at this point in the history
DM-40242: skip existence checks for quantum inputs that have datastore records
  • Loading branch information
TallJimbo authored Jan 2, 2025
2 parents e4484fe + 7fe3f9d commit 2ebd24b
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 2 deletions.
3 changes: 3 additions & 0 deletions doc/changes/DM-40242.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Input datasets that were already found to exist during QG generation will no longer be re-checked for existence during execution.

This mitigates a problem in which butler misconfiguration (e.g. datastore name mismatches) would lead to hard-to-spot `NoWorkFound` conditions in the first step in a pipeline. Those errors should now result in a `FileNotFoundError` with more helpful information.
25 changes: 24 additions & 1 deletion python/lsst/ctrl/mpexec/simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def from_pipeline_filename(
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = True,
attach_datastore_records: bool = False,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an on-disk
pipeline YAML file.
Expand All @@ -172,6 +173,10 @@ def from_pipeline_filename(
`lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead
of considering the partial result a success and continuing to run
downstream tasks.
attach_datastore_records : `bool`, optional
Whether to attach datastore records to the quantum graph. This is
usually unnecessary, unless the executor is used to test behavior
that depends on datastore records.
Returns
-------
Expand All @@ -188,6 +193,7 @@ def from_pipeline_filename(
bind=bind,
resources=resources,
raise_on_partial_outputs=raise_on_partial_outputs,
attach_datastore_records=attach_datastore_records,
)

@classmethod
Expand All @@ -202,6 +208,7 @@ def from_task_class(
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = True,
attach_datastore_records: bool = False,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from a pipeline
containing a single task.
Expand Down Expand Up @@ -231,6 +238,10 @@ def from_task_class(
`lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead
of considering the partial result a success and continuing to run
downstream tasks.
attach_datastore_records : `bool`, optional
Whether to attach datastore records to the quantum graph. This is
usually unnecessary, unless the executor is used to test behavior
that depends on datastore records.
Returns
-------
Expand All @@ -257,6 +268,7 @@ def from_task_class(
bind=bind,
resources=resources,
raise_on_partial_outputs=raise_on_partial_outputs,
attach_datastore_records=attach_datastore_records,
)

@classmethod
Expand All @@ -269,6 +281,7 @@ def from_pipeline(
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = True,
attach_datastore_records: bool = False,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an in-memory
pipeline.
Expand All @@ -294,6 +307,10 @@ def from_pipeline(
`lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead
of considering the partial result a success and continuing to run
downstream tasks.
attach_datastore_records : `bool`, optional
Whether to attach datastore records to the quantum graph. This is
usually unnecessary, unless the executor is used to test behavior
that depends on datastore records.
Returns
-------
Expand All @@ -310,6 +327,7 @@ def from_pipeline(
butler=butler,
resources=resources,
raise_on_partial_outputs=raise_on_partial_outputs,
attach_datastore_records=attach_datastore_records,
)

@classmethod
Expand All @@ -322,6 +340,7 @@ def from_pipeline_graph(
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = True,
attach_datastore_records: bool = False,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an in-memory
pipeline graph.
Expand All @@ -348,6 +367,10 @@ def from_pipeline_graph(
`lsst.pipe.base.AnnotatedPartialOutputError` immediately, instead
of considering the partial result a success and continuing to run
downstream tasks.
attach_datastore_records : `bool`, optional
Whether to attach datastore records to the quantum graph. This is
usually unnecessary, unless the executor is used to test behavior
that depends on datastore records.
Returns
-------
Expand All @@ -359,7 +382,7 @@ def from_pipeline_graph(
quantum_graph_builder = AllDimensionsQuantumGraphBuilder(
pipeline_graph, butler, where=where, bind=bind
)
quantum_graph = quantum_graph_builder.build(attach_datastore_records=False)
quantum_graph = quantum_graph_builder.build(attach_datastore_records=attach_datastore_records)
return cls(
quantum_graph=quantum_graph,
butler=butler,
Expand Down
46 changes: 45 additions & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,23 @@ def updatedQuantumInputs(
task_node.label,
quantum.dataId,
)
toCheck = []
newRefsForDatasetType = updatedInputs[key]
stored = limited_butler.stored_many(refsForDatasetType)
for ref in refsForDatasetType:
if self._should_assume_exists(quantum, ref):
newRefsForDatasetType.append(ref)
else:
toCheck.append(ref)
if not toCheck:
_LOG.debug(
"Assuming overall input '%s' is present without checks for label=%s dataId=%s.",
key.name,
task_node.label,
quantum.dataId,
)
continue
stored = limited_butler.stored_many(toCheck)
for ref in toCheck:
if stored[ref]:
newRefsForDatasetType.append(ref)
else:
Expand Down Expand Up @@ -567,3 +581,33 @@ def initGlobals(self, quantum: Quantum) -> None:
else:
oneInstrument = instrument
Instrument.fromName(instrument, self.butler.registry)

def _should_assume_exists(self, quantum: Quantum, ref: DatasetRef) -> bool | None:
"""Report whether the given dataset can be assumed to exist because
some previous check reported that it did.
If this is `True` for a dataset does not in fact exist anymore, that's
an unexpected problem that we want to raise as an exception, and
definitely not a case where some predicted output just wasn't produced.
We can't always tell the difference, but in this case we can.
Parameters
----------
quantum : `Quantum`
Quantum being processed.
ref : `lsst.daf.butler.DatasetRef`
Reference to the input dataset.
Returns
-------
exists : `bool` or `None`
`True` if this dataset is definitely an overall input, `False` if
some other quantum in the graph is expected to produce it, and
`None` if the answer could not be determined.
"""
if quantum.datastore_records:
for datastore_record_data in quantum.datastore_records.values():
if ref.id in datastore_record_data.records:
return True
return False
return None
49 changes: 49 additions & 0 deletions tests/test_simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,55 @@ def test_partial_outputs_failure(self):
self.assertFalse(self.butler.exists("intermediate"))
self.assertFalse(self.butler.exists("output"))

def test_existence_check_skips(self):
"""Test that pre-execution existence checks are not performed for
overall-input datasets, as this those checks could otherwise mask
repository configuration problems or downtime as NoWorkFound cases.
"""
# First we configure and execute task A, which is just a way to get a
# MockDataset in the repo for us to play with; the important test can't
# use the non-mock 'input' dataset because the mock runQuantum only
# actually reads MockDatasets.
config_a = DynamicTestPipelineTaskConfig()
config_a.inputs["i"] = DynamicConnectionConfig(
dataset_type_name="input", storage_class="StructuredDataDict", mock_storage_class=False
)
config_a.outputs["o"] = DynamicConnectionConfig(
dataset_type_name="intermediate", storage_class="StructuredDataDict"
)
executor_a = SimplePipelineExecutor.from_task_class(
DynamicTestPipelineTask,
config=config_a,
butler=self.butler,
label="a",
)
executor_a.run(register_dataset_types=True)
# Now we can do the real test.
config_b = DynamicTestPipelineTaskConfig()
config_b.inputs["i"] = DynamicConnectionConfig(
dataset_type_name="intermediate", storage_class="StructuredDataDict", minimum=0
)
config_b.outputs["o"] = DynamicConnectionConfig(
dataset_type_name="output", storage_class="StructuredDataDict"
)
butler = self.butler.clone(run="new_run")
executor_b = SimplePipelineExecutor.from_task_class(
DynamicTestPipelineTask,
config=config_b,
butler=butler,
label="b",
attach_datastore_records=True,
)
# Delete the input dataset after the QG has already been built.
intermediate_refs = butler.query_datasets("intermediate")
self.assertEqual(len(intermediate_refs), 1)
butler.pruneDatasets(intermediate_refs, purge=True, unstore=True)
with self.assertRaises(FileNotFoundError):
# We should get an exception rather than NoWorkFound, because for
# this single-task pipeline, the missing dataset is an
# overall-input (name notwithstanding).
executor_b.run(register_dataset_types=True)


class MemoryTester(lsst.utils.tests.MemoryTestCase):
"""Generic tests for file leaks."""
Expand Down

0 comments on commit 2ebd24b

Please sign in to comment.