From ffd2e4c6dea98181964520ed833c45780193a364 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Fri, 1 Nov 2024 12:39:03 -0700 Subject: [PATCH 1/6] Experimental command to copy files based on a graph --- python/lsst/pipe/base/cli/cmd/__init__.py | 4 +- python/lsst/pipe/base/cli/cmd/commands.py | 61 +++++++- python/lsst/pipe/base/cli/resources.yaml | 1 + python/lsst/pipe/base/script/__init__.py | 1 + .../script/retrieve_artifacts_for_quanta.py | 139 ++++++++++++++++++ 5 files changed, 203 insertions(+), 3 deletions(-) create mode 100644 python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py diff --git a/python/lsst/pipe/base/cli/cmd/__init__.py b/python/lsst/pipe/base/cli/cmd/__init__.py index 546af6bf9..a2c4c28a9 100644 --- a/python/lsst/pipe/base/cli/cmd/__init__.py +++ b/python/lsst/pipe/base/cli/cmd/__init__.py @@ -25,6 +25,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph"] +__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph", "retrieve_artifacts_for_quanta"] -from .commands import register_instrument, transfer_from_graph, zip_from_graph +from .commands import register_instrument, retrieve_artifacts_for_quanta, transfer_from_graph, zip_from_graph diff --git a/python/lsst/pipe/base/cli/cmd/commands.py b/python/lsst/pipe/base/cli/cmd/commands.py index f87670984..c6858d0e0 100644 --- a/python/lsst/pipe/base/cli/cmd/commands.py +++ b/python/lsst/pipe/base/cli/cmd/commands.py @@ -34,8 +34,9 @@ register_dataset_types_option, repo_argument, transfer_dimensions_option, + transfer_option, ) -from lsst.daf.butler.cli.utils import ButlerCommand +from lsst.daf.butler.cli.utils import ButlerCommand, split_commas, unwrap from ... import script from ..opt import instrument_argument, update_output_chain_option @@ -92,3 +93,61 @@ def zip_from_graph(**kwargs: Any) -> None: """ zip = script.zip_from_graph(**kwargs) print(f"Zip archive written to {zip}") + + +@click.command(short_help="Retrieve artifacts from subset of graph.", cls=ButlerCommand) +@click.argument("graph", required=True) +@repo_argument( + required=True, + help="REPO is a URI to a butler configuration that is used to configure " + "the datastore of the quantum-backed butler.", +) +@click.argument("dest", required=True) +@transfer_option() +@click.option( + "--preserve-path/--no-preserve-path", + is_flag=True, + default=True, + help="Preserve the datastore path to the artifact at the destination.", +) +@click.option( + "--clobber/--no-clobber", + is_flag=True, + default=False, + help="If clobber, overwrite files if they exist locally.", +) +@click.option( + "--qgraph-node-id", + callback=split_commas, + multiple=True, + help=unwrap( + """Only load a specified set of nodes when graph is + loaded from a file, nodes are identified by UUID + values. One or more comma-separated strings are + accepted. By default all nodes are loaded. Ignored if + graph is not loaded from a file.""" + ), +) +@click.option( + "--include-inputs/--no-include-inputs", + is_flag=True, + default=True, + help="Whether to include input datasets in retrieval.", +) +@click.option( + "--include-outputs/--no-include-outputs", + is_flag=True, + default=True, + help="Whether to include outut datasets in retrieval.", +) +@options_file_option() +def retrieve_artifacts_for_quanta(**kwargs: Any) -> None: + """Retrieve artifacts from given quanta defined in quantum graph. + + GRAPH is a URI to the source quantum graph file to use when building the + Zip archive. + + DEST is a directory to write the Zip archive. + """ + artifacts = script.retrieve_artifacts_for_quanta(**kwargs) + print(f"Written {len(artifacts)} artifacts to {kwargs['dest']}.") diff --git a/python/lsst/pipe/base/cli/resources.yaml b/python/lsst/pipe/base/cli/resources.yaml index 9dedebeed..6a166be64 100644 --- a/python/lsst/pipe/base/cli/resources.yaml +++ b/python/lsst/pipe/base/cli/resources.yaml @@ -4,3 +4,4 @@ cmd: - register-instrument - transfer-from-graph - zip-from-graph + - retrieve-artifacts-for-quanta diff --git a/python/lsst/pipe/base/script/__init__.py b/python/lsst/pipe/base/script/__init__.py index f197d49c9..e648c0ec3 100644 --- a/python/lsst/pipe/base/script/__init__.py +++ b/python/lsst/pipe/base/script/__init__.py @@ -26,5 +26,6 @@ # along with this program. If not, see . from .register_instrument import register_instrument +from .retrieve_artifacts_for_quanta import retrieve_artifacts_for_quanta from .transfer_from_graph import transfer_from_graph from .zip_from_graph import zip_from_graph diff --git a/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py b/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py new file mode 100644 index 000000000..b62bdeb49 --- /dev/null +++ b/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py @@ -0,0 +1,139 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ["retrieve_artifacts_for_quanta"] + +import logging + +from lsst.daf.butler import DatasetRef, QuantumBackedButler +from lsst.daf.butler.datastore.record_data import DatastoreRecordData +from lsst.pipe.base import QuantumGraph +from lsst.resources import ResourcePath + +_LOG = logging.getLogger(__name__) + + +def retrieve_artifacts_for_quanta( + graph: str, + repo: str, + dest: str, + transfer: str, + preserve_path: bool, + clobber: bool, + qgraph_node_id: list[str], + include_inputs: bool, + include_outputs: bool, +) -> list[ResourcePath]: + """Retrieve artifacts from a graph and store locally. + + Parameters + ---------- + graph : `str` + URI string of the quantum graph. + repo : `str` + URI string of the Butler repo to use. + dest : `str` + URI string of the directory to write the artifacts. + transfer : `str` + Transfer mode to use when placing artifacts in the destination. + preserve_path : `bool` + If `True` the full datastore path will be retained within the + destination directory, else only the filename will be used. + clobber : `bool` + If `True` allow transfers to overwrite files at the destination. + qgraph_node_id : `tuple` [ `str` ] + Quanta to extract. + include_inputs : `bool` + Whether to include input datasets in retrieval. + include_outputs : `bool` + Whether to include output datasets in retrieval. + + Returns + ------- + paths : `list` [ `lsst.resources.ResourcePath` ] + The paths to the artifacts that were written. + """ + # Read graph into memory. + nodes = qgraph_node_id or None + qgraph = QuantumGraph.loadUri(graph, nodes=nodes) + + # Get data repository definitions from the QuantumGraph; these can have + # different storage classes than those in the quanta. + dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} + + datastore_records: dict[str, DatastoreRecordData] = {} + refs: set[DatasetRef] = set() + if include_inputs: + # Collect input refs used by this graph. + for task_def in qgraph.iterTaskGraph(): + if in_refs := qgraph.initInputRefs(task_def): + refs.update(in_refs) + for qnode in qgraph: + for otherRefs in qnode.quantum.inputs.values(): + refs.update(otherRefs) + for store_name, records in qnode.quantum.datastore_records.items(): + datastore_records.setdefault(store_name, DatastoreRecordData()).update(records) + n_inputs = len(refs) + if n_inputs: + _LOG.info("Found %d input dataset%s.", n_inputs, "" if n_inputs == 1 else "s") + + if include_outputs: + # Collect output refs that could be created by this graph. + original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs()) + for task_def in qgraph.iterTaskGraph(): + if out_refs := qgraph.initOutputRefs(task_def): + original_output_refs.update(out_refs) + for qnode in qgraph: + for otherRefs in qnode.quantum.outputs.values(): + original_output_refs.update(otherRefs) + + # Convert output_refs to the data repository storage classes, too. + for ref in original_output_refs: + internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) + if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: + refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name)) + else: + refs.add(ref) + + n_outputs = len(refs) - n_inputs + if n_outputs: + _LOG.info("Found %d output dataset%s.", n_outputs, "" if n_outputs == 1 else "s") + + # Make QBB, its config is the same as output Butler. + qbb = QuantumBackedButler.from_predicted( + config=repo, + predicted_inputs=[ref.id for ref in refs], + predicted_outputs=[], + dimensions=qgraph.universe, + datastore_records=datastore_records, + dataset_types=dataset_types, + ) + + paths = qbb.retrieve_artifacts( + refs, dest, transfer=transfer, overwrite=clobber, preserve_path=preserve_path + ) + return paths From a69d8884e2a5e348af924d6711ed8ae167599ed7 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Fri, 1 Nov 2024 12:44:24 -0700 Subject: [PATCH 2/6] Fix type annotation for nodes parameter for loadUri --- python/lsst/pipe/base/graph/graph.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/lsst/pipe/base/graph/graph.py b/python/lsst/pipe/base/graph/graph.py index 0cce2b3d7..89cf60f71 100644 --- a/python/lsst/pipe/base/graph/graph.py +++ b/python/lsst/pipe/base/graph/graph.py @@ -909,7 +909,7 @@ def loadUri( cls, uri: ResourcePathExpression, universe: DimensionUniverse | None = None, - nodes: Iterable[uuid.UUID] | None = None, + nodes: Iterable[uuid.UUID | str] | None = None, graphID: BuildId | None = None, minimumVersion: int = 3, ) -> QuantumGraph: @@ -924,7 +924,7 @@ def loadUri( saved structure. If supplied, the `~lsst.daf.butler.DimensionUniverse` from the loaded `QuantumGraph` will be validated against the supplied argument for compatibility. - nodes : iterable of `uuid.UUID` or `None` + nodes : iterable of [ `uuid.UUID` | `str` ] or `None` UUIDs that correspond to nodes in the graph. If specified, only these nodes will be loaded. Defaults to None, in which case all nodes will be loaded. From 76d5ca021836438d2b13943b38f031f63b350fd6 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Fri, 1 Nov 2024 16:31:43 -0700 Subject: [PATCH 3/6] Add new QuantumGraph.get_refs method and use it in scripts --- python/lsst/pipe/base/graph/graph.py | 152 ++++++++++++++++++ .../script/retrieve_artifacts_for_quanta.py | 51 ++---- .../pipe/base/script/transfer_from_graph.py | 25 +-- .../lsst/pipe/base/script/zip_from_graph.py | 23 +-- tests/test_quantumGraph.py | 23 +++ 5 files changed, 192 insertions(+), 82 deletions(-) diff --git a/python/lsst/pipe/base/graph/graph.py b/python/lsst/pipe/base/graph/graph.py index 89cf60f71..373575381 100644 --- a/python/lsst/pipe/base/graph/graph.py +++ b/python/lsst/pipe/base/graph/graph.py @@ -44,6 +44,7 @@ from types import MappingProxyType from typing import Any, BinaryIO, TypeVar +import lsst.utils.logging import networkx as nx from lsst.daf.butler import ( Config, @@ -75,6 +76,7 @@ from .quantumNode import BuildId, QuantumNode _T = TypeVar("_T", bound="QuantumGraph") +_LOG = lsst.utils.logging.getLogger(__name__) # modify this constant any time the on disk representation of the save file # changes, and update the load helpers to behave properly for each version. @@ -1656,3 +1658,153 @@ def init_output_run(self, butler: LimitedButler, existing: bool = True) -> None: self.write_configs(butler, compare_existing=existing) self.write_packages(butler, compare_existing=existing) self.write_init_outputs(butler, skip_existing=existing) + + def get_refs( + self, + *, + include_init_inputs: bool = False, + include_inputs: bool = False, + include_intermediates: bool | None = None, + include_init_outputs: bool = False, + include_outputs: bool = False, + conform_outputs: bool = True, + ) -> tuple[set[DatasetRef], dict[str, DatastoreRecordData]]: + """Get the requested dataset refs from the graph. + + Parameters + ---------- + include_init_inputs : `bool`, optional + Include init inputs. + include_inputs : `bool`, optional + Include inputs. + include_intermediates : `bool` or `None`, optional + If `None`, no special handling for intermediates is performed. + If `True` intermediates are calculated even if other flags + do not request datasets. If `False` intermediates will be removed + from any results. + include_init_outputs : `bool`, optional + Include init outpus. + include_outputs : `bool`, optional + Include outputs. + conform_outputs : `bool`, optional + Whether any outputs found should have their dataset types conformed + with the registry dataset types. + + Returns + ------- + refs : `set` [ `lsst.daf.butler.DatasetRef` ] + The requested dataset refs found in the graph. + datastore_records : `dict` [ `str`, \ + `lsst.daf.butler.datastore.record_data.DatastoreRecordData` ] + Any datastore records found. + + Notes + ----- + Conforming and requesting inputs and outputs can result in the same + dataset appearing in the results twice with differing storage classes. + """ + datastore_records: dict[str, DatastoreRecordData] = {} + init_input_refs: set[DatasetRef] = set() + init_output_refs: set[DatasetRef] = set(self.globalInitOutputRefs()) + + if include_intermediates is True: + # Need to enable inputs and outputs even if not explicitly + # requested. + request_include_init_inputs = True + request_include_inputs = True + request_include_init_outputs = True + request_include_outputs = True + else: + request_include_init_inputs = include_init_inputs + request_include_inputs = include_inputs + request_include_init_outputs = include_init_outputs + request_include_outputs = include_outputs + + if request_include_init_inputs or request_include_init_outputs: + for task_def in self.iterTaskGraph(): + if request_include_init_inputs: + if in_refs := self.initInputRefs(task_def): + init_input_refs.update(in_refs) + if request_include_init_outputs: + if out_refs := self.initOutputRefs(task_def): + init_output_refs.update(out_refs) + + input_refs: set[DatasetRef] = set() + output_refs: set[DatasetRef] = set() + + for qnode in self: + if request_include_inputs: + for other_refs in qnode.quantum.inputs.values(): + input_refs.update(other_refs) + # Inputs can come with datastore records. + for store_name, records in qnode.quantum.datastore_records.items(): + datastore_records.setdefault(store_name, DatastoreRecordData()).update(records) + if request_include_outputs: + for other_refs in qnode.quantum.outputs.values(): + output_refs.update(other_refs) + + # Intermediates are the intersection of inputs and outputs. Must do + # this analysis before conforming since dataset type changes will + # change set membership. + inter_msg = "" + intermediates = set() + if include_intermediates is not None: + intermediates = (input_refs | init_input_refs) & (output_refs | init_output_refs) + + if include_intermediates is False: + # Remove intermediates from results. + init_input_refs -= intermediates + input_refs -= intermediates + init_output_refs -= intermediates + output_refs -= intermediates + inter_msg = f"; Intermediates removed: {len(intermediates)}" + intermediates = set() + elif include_intermediates is True: + # Do not mention intermediates if all the input/output flags + # would have resulted in them anyhow. + if ( + (request_include_init_inputs is not include_init_inputs) + or (request_include_inputs is not include_inputs) + or (request_include_init_outputs is not include_init_outputs) + or (request_include_outputs is not include_outputs) + ): + inter_msg = f"; including intermediates: {len(intermediates)}" + + # Assign intermediates to the relevant category. + if not include_init_inputs: + init_input_refs &= intermediates + if not include_inputs: + input_refs &= intermediates + if not include_init_outputs: + init_output_refs &= intermediates + if not include_outputs: + output_refs &= intermediates + + # Conforming can result in an input ref and an output ref appearing + # in the returned results that are identical apart from storage class. + if conform_outputs: + # Get data repository definitions from the QuantumGraph; these can + # have different storage classes than those in the quanta. + dataset_types = {dstype.name: dstype for dstype in self.registryDatasetTypes()} + + def _update_ref(ref: DatasetRef) -> DatasetRef: + internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) + if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: + ref = ref.overrideStorageClass(internal_dataset_type.storageClass_name) + return ref + + # Convert output_refs to the data repository storage classes, too. + output_refs = {_update_ref(ref) for ref in output_refs} + init_output_refs = {_update_ref(ref) for ref in init_output_refs} + + _LOG.verbose( + "Found the following datasets. InitInputs: %d; Inputs: %d; InitOutputs: %s; Outputs: %d%s", + len(init_input_refs), + len(input_refs), + len(init_output_refs), + len(output_refs), + inter_msg, + ) + + refs = input_refs | init_input_refs | init_output_refs | output_refs + return refs, datastore_records diff --git a/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py b/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py index b62bdeb49..fed44c01b 100644 --- a/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py +++ b/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py @@ -29,8 +29,7 @@ import logging -from lsst.daf.butler import DatasetRef, QuantumBackedButler -from lsst.daf.butler.datastore.record_data import DatastoreRecordData +from lsst.daf.butler import QuantumBackedButler from lsst.pipe.base import QuantumGraph from lsst.resources import ResourcePath @@ -48,7 +47,7 @@ def retrieve_artifacts_for_quanta( include_inputs: bool, include_outputs: bool, ) -> list[ResourcePath]: - """Retrieve artifacts from a graph and store locally. + """Retrieve artifacts referenced in a graph and store locally. Parameters ---------- @@ -81,48 +80,18 @@ def retrieve_artifacts_for_quanta( nodes = qgraph_node_id or None qgraph = QuantumGraph.loadUri(graph, nodes=nodes) + refs, datastore_records = qgraph.get_refs( + include_inputs=include_inputs, + include_init_inputs=include_inputs, + include_outputs=include_outputs, + include_init_outputs=include_outputs, + conform_outputs=True, # Need to look for predicted outputs with correct storage class. + ) + # Get data repository definitions from the QuantumGraph; these can have # different storage classes than those in the quanta. dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} - datastore_records: dict[str, DatastoreRecordData] = {} - refs: set[DatasetRef] = set() - if include_inputs: - # Collect input refs used by this graph. - for task_def in qgraph.iterTaskGraph(): - if in_refs := qgraph.initInputRefs(task_def): - refs.update(in_refs) - for qnode in qgraph: - for otherRefs in qnode.quantum.inputs.values(): - refs.update(otherRefs) - for store_name, records in qnode.quantum.datastore_records.items(): - datastore_records.setdefault(store_name, DatastoreRecordData()).update(records) - n_inputs = len(refs) - if n_inputs: - _LOG.info("Found %d input dataset%s.", n_inputs, "" if n_inputs == 1 else "s") - - if include_outputs: - # Collect output refs that could be created by this graph. - original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs()) - for task_def in qgraph.iterTaskGraph(): - if out_refs := qgraph.initOutputRefs(task_def): - original_output_refs.update(out_refs) - for qnode in qgraph: - for otherRefs in qnode.quantum.outputs.values(): - original_output_refs.update(otherRefs) - - # Convert output_refs to the data repository storage classes, too. - for ref in original_output_refs: - internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) - if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: - refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name)) - else: - refs.add(ref) - - n_outputs = len(refs) - n_inputs - if n_outputs: - _LOG.info("Found %d output dataset%s.", n_outputs, "" if n_outputs == 1 else "s") - # Make QBB, its config is the same as output Butler. qbb = QuantumBackedButler.from_predicted( config=repo, diff --git a/python/lsst/pipe/base/script/transfer_from_graph.py b/python/lsst/pipe/base/script/transfer_from_graph.py index 68b672400..1a3456048 100644 --- a/python/lsst/pipe/base/script/transfer_from_graph.py +++ b/python/lsst/pipe/base/script/transfer_from_graph.py @@ -27,7 +27,7 @@ __all__ = ["transfer_from_graph"] -from lsst.daf.butler import Butler, CollectionType, DatasetRef, QuantumBackedButler, Registry +from lsst.daf.butler import Butler, CollectionType, QuantumBackedButler, Registry from lsst.daf.butler.registry import MissingCollectionError from lsst.pipe.base import QuantumGraph @@ -69,27 +69,10 @@ def transfer_from_graph( # Read whole graph into memory qgraph = QuantumGraph.loadUri(graph) - # Collect output refs that could be created by this graph. - original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs()) - for task_def in qgraph.iterTaskGraph(): - if refs := qgraph.initOutputRefs(task_def): - original_output_refs.update(refs) - for qnode in qgraph: - for otherRefs in qnode.quantum.outputs.values(): - original_output_refs.update(otherRefs) - - # Get data repository definitions from the QuantumGraph; these can have - # different storage classes than those in the quanta. - dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} + output_refs, _ = qgraph.get_refs(include_outputs=True, include_init_outputs=True, conform_outputs=True) - # Convert output_refs to the data repository storage classes, too. - output_refs = set() - for ref in original_output_refs: - internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) - if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: - output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name)) - else: - output_refs.add(ref) + # Get data repository dataset type definitions from the QuantumGraph. + dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} # Make QBB, its config is the same as output Butler. qbb = QuantumBackedButler.from_predicted( diff --git a/python/lsst/pipe/base/script/zip_from_graph.py b/python/lsst/pipe/base/script/zip_from_graph.py index ce36aedac..b5c16f9e5 100644 --- a/python/lsst/pipe/base/script/zip_from_graph.py +++ b/python/lsst/pipe/base/script/zip_from_graph.py @@ -30,7 +30,7 @@ import logging import re -from lsst.daf.butler import DatasetRef, QuantumBackedButler +from lsst.daf.butler import QuantumBackedButler from lsst.daf.butler.utils import globToRegex from lsst.pipe.base import QuantumGraph from lsst.resources import ResourcePath @@ -67,28 +67,11 @@ def zip_from_graph( # Read whole graph into memory qgraph = QuantumGraph.loadUri(graph) - # Collect output refs that could be created by this graph. - original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs()) - for task_def in qgraph.iterTaskGraph(): - if refs := qgraph.initOutputRefs(task_def): - original_output_refs.update(refs) - for qnode in qgraph: - for otherRefs in qnode.quantum.outputs.values(): - original_output_refs.update(otherRefs) + output_refs, _ = qgraph.get_refs(include_outputs=True, include_init_outputs=True, conform_outputs=True) - # Get data repository definitions from the QuantumGraph; these can have - # different storage classes than those in the quanta. + # Get data repository dataset type definitions from the QuantumGraph. dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} - # Convert output_refs to the data repository storage classes, too. - output_refs = set() - for ref in original_output_refs: - internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) - if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: - output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name)) - else: - output_refs.add(ref) - # Make QBB, its config is the same as output Butler. qbb = QuantumBackedButler.from_predicted( config=repo, diff --git a/tests/test_quantumGraph.py b/tests/test_quantumGraph.py index 84c1d7636..f495ac284 100644 --- a/tests/test_quantumGraph.py +++ b/tests/test_quantumGraph.py @@ -617,6 +617,29 @@ def testGetSummary(self) -> None: self.assertEqual(self.qGraph.graphID, summary.graphID) self.assertEqual(len(summary.qgraphTaskSummaries), len(self.qGraph.taskGraph)) + def test_get_refs(self) -> None: + """Test that dataset refs can be retrieved from graph.""" + refs, _ = self.qGraph.get_refs(include_inputs=True) + self.assertEqual(len(refs), 8, str(refs)) + refs, _ = self.qGraph.get_refs(include_init_inputs=True) + self.assertEqual(len(refs), 2, str(refs)) + refs, _ = self.qGraph.get_refs(include_init_outputs=True) + self.assertEqual(len(refs), 4, str(refs)) + refs, _ = self.qGraph.get_refs(include_outputs=True) + self.assertEqual(len(refs), 8, str(refs)) + refs, _ = self.qGraph.get_refs(include_inputs=True, include_outputs=True) + self.assertEqual(len(refs), 12, str(refs)) + refs, _ = self.qGraph.get_refs( + include_inputs=True, include_outputs=True, include_init_inputs=True, include_init_outputs=True + ) + self.assertEqual(len(refs), 16, str(refs)) + refs, _ = self.qGraph.get_refs(include_intermediates=True) + self.assertEqual(len(refs), 6, str(refs)) + refs, _ = self.qGraph.get_refs(include_intermediates=False) + self.assertEqual(len(refs), 0, str(refs)) + refs, _ = self.qGraph.get_refs(include_intermediates=False, include_inputs=True, include_outputs=True) + self.assertEqual(len(refs), 8, str(refs)) + class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): """Run file leak tests.""" From 5a6f80f52ccbf09d43c94e029c266af6b9933267 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Tue, 5 Nov 2024 12:48:30 -0700 Subject: [PATCH 4/6] Use newer ref.replace API rather than overrideStorageClass --- python/lsst/pipe/base/graph/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/pipe/base/graph/graph.py b/python/lsst/pipe/base/graph/graph.py index 373575381..29cdbc97c 100644 --- a/python/lsst/pipe/base/graph/graph.py +++ b/python/lsst/pipe/base/graph/graph.py @@ -1790,7 +1790,7 @@ def get_refs( def _update_ref(ref: DatasetRef) -> DatasetRef: internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: - ref = ref.overrideStorageClass(internal_dataset_type.storageClass_name) + ref = ref.replace(storage_class=internal_dataset_type.storageClass_name) return ref # Convert output_refs to the data repository storage classes, too. From ae7442711d06e2fb076095703f829d64d939f149 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Tue, 5 Nov 2024 13:10:21 -0700 Subject: [PATCH 5/6] Add news fragment --- doc/changes/DM-47328.feature.rst | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 doc/changes/DM-47328.feature.rst diff --git a/doc/changes/DM-47328.feature.rst b/doc/changes/DM-47328.feature.rst new file mode 100644 index 000000000..3ad2fc65a --- /dev/null +++ b/doc/changes/DM-47328.feature.rst @@ -0,0 +1,2 @@ +* Added new command-line ``butler retrieve-artifacts-for-quanta`` which can be used to retrieve input or output datasets associated with a graph or specific quanta. +* Added new ``QuantumGraph.get_refs()`` method to retrieve dataset refs from a graph. From 9465cbdd91617e581378d9c56b4535c4ed6f8435 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Tue, 5 Nov 2024 13:11:06 -0700 Subject: [PATCH 6/6] Test that each command-line functions --- .github/workflows/build.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 0d5fda462..c2d6f0916 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -59,6 +59,8 @@ jobs: run: | pytest -r a -v -n 3 --cov=lsst.pipe.base --cov=tests --cov-report=xml --cov-report=term --cov-branch butler register-instrument -h + butler transfer-from-graph -h + butler retrieve-artifacts-for-quanta -h - name: Upload coverage to codecov uses: codecov/codecov-action@v4 with: