diff --git a/python/lsst/pipe/base/quantum_graph_builder.py b/python/lsst/pipe/base/quantum_graph_builder.py index 8961282da..5884b94f4 100644 --- a/python/lsst/pipe/base/quantum_graph_builder.py +++ b/python/lsst/pipe/base/quantum_graph_builder.py @@ -180,7 +180,6 @@ def __init__( self.metadata = TaskMetadata() self._pipeline_graph = pipeline_graph self.butler = butler - self._pipeline_graph.resolve(self.butler.registry) if input_collections is None: input_collections = butler.collections if not input_collections: @@ -233,11 +232,13 @@ def __init__( packages_storage_class, ) } - self._find_empty_dimension_datasets() - self.prerequisite_info = { - task_node.label: PrerequisiteInfo(task_node, self._pipeline_graph) - for task_node in pipeline_graph.tasks.values() - } + with self.butler.registry.caching_context(): + self._pipeline_graph.resolve(self.butler.registry) + self._find_empty_dimension_datasets() + self.prerequisite_info = { + task_node.label: PrerequisiteInfo(task_node, self._pipeline_graph) + for task_node in pipeline_graph.tasks.values() + } log: LsstLogAdapter """Logger to use for all quantum-graph generation messages. @@ -339,42 +340,44 @@ def build(self, metadata: Mapping[str, Any] | None = None) -> QuantumGraph: call this method exactly once. See class documentation for details on what it does. """ - full_skeleton = QuantumGraphSkeleton(self._pipeline_graph.tasks) - subgraphs = list(self._pipeline_graph.split_independent()) - for i, subgraph in enumerate(subgraphs): - self.log.info( - "Processing pipeline subgraph %d of %d with %d task(s).", - i + 1, - len(subgraphs), - len(subgraph.tasks), - ) - self.log.verbose("Subgraph tasks: [%s]", ", ".join(label for label in subgraph.tasks)) - subgraph_skeleton = self.process_subgraph(subgraph) - full_skeleton.update(subgraph_skeleton) - # Loop over tasks. The pipeline graph must be topologically sorted, - # so a quantum is only processed after any quantum that provides its - # inputs has been processed. - for task_node in self._pipeline_graph.tasks.values(): - self._resolve_task_quanta(task_node, full_skeleton) - # Add global init-outputs to the skeleton. - for dataset_type in self._global_init_output_types.values(): - dataset_key = full_skeleton.add_dataset_node( - dataset_type.name, self.empty_data_id, is_global_init_output=True - ) - ref = self.existing_datasets.outputs_in_the_way.get(dataset_key) - if ref is None: - ref = DatasetRef(dataset_type, self.empty_data_id, run=self.output_run) - full_skeleton[dataset_key]["ref"] = ref - # Remove dataset nodes with no edges that are not global init outputs, - # which are generally overall-inputs whose original quanta end up - # skipped or with no work to do (we can't remove these along with the - # quanta because no quantum knows if its the only consumer). - full_skeleton.remove_orphan_datasets() - self._attach_datastore_records(full_skeleton) - # TODO initialize most metadata here instead of in ctrl_mpexec. - if metadata is None: - metadata = {} - return self._construct_quantum_graph(full_skeleton, metadata) + with self.butler.registry.caching_context(): + full_skeleton = QuantumGraphSkeleton(self._pipeline_graph.tasks) + subgraphs = list(self._pipeline_graph.split_independent()) + for i, subgraph in enumerate(subgraphs): + self.log.info( + "Processing pipeline subgraph %d of %d with %d task(s).", + i + 1, + len(subgraphs), + len(subgraph.tasks), + ) + self.log.verbose("Subgraph tasks: [%s]", ", ".join(label for label in subgraph.tasks)) + subgraph_skeleton = self.process_subgraph(subgraph) + full_skeleton.update(subgraph_skeleton) + # Loop over tasks. The pipeline graph must be topologically + # sorted, so a quantum is only processed after any quantum that + # provides its inputs has been processed. + for task_node in self._pipeline_graph.tasks.values(): + self._resolve_task_quanta(task_node, full_skeleton) + # Add global init-outputs to the skeleton. + for dataset_type in self._global_init_output_types.values(): + dataset_key = full_skeleton.add_dataset_node( + dataset_type.name, self.empty_data_id, is_global_init_output=True + ) + ref = self.existing_datasets.outputs_in_the_way.get(dataset_key) + if ref is None: + ref = DatasetRef(dataset_type, self.empty_data_id, run=self.output_run) + full_skeleton[dataset_key]["ref"] = ref + # Remove dataset nodes with no edges that are not global init + # outputs, which are generally overall-inputs whose original quanta + # end up skipped or with no work to do (we can't remove these along + # with the quanta because no quantum knows if its the only + # consumer). + full_skeleton.remove_orphan_datasets() + self._attach_datastore_records(full_skeleton) + # TODO initialize most metadata here instead of in ctrl_mpexec. + if metadata is None: + metadata = {} + return self._construct_quantum_graph(full_skeleton, metadata) @abstractmethod def process_subgraph(self, subgraph: PipelineGraph) -> QuantumGraphSkeleton: