diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 0d5fda462..c2d6f0916 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -59,6 +59,8 @@ jobs: run: | pytest -r a -v -n 3 --cov=lsst.pipe.base --cov=tests --cov-report=xml --cov-report=term --cov-branch butler register-instrument -h + butler transfer-from-graph -h + butler retrieve-artifacts-for-quanta -h - name: Upload coverage to codecov uses: codecov/codecov-action@v4 with: diff --git a/doc/changes/DM-47328.feature.rst b/doc/changes/DM-47328.feature.rst new file mode 100644 index 000000000..3ad2fc65a --- /dev/null +++ b/doc/changes/DM-47328.feature.rst @@ -0,0 +1,2 @@ +* Added new command-line ``butler retrieve-artifacts-for-quanta`` which can be used to retrieve input or output datasets associated with a graph or specific quanta. +* Added new ``QuantumGraph.get_refs()`` method to retrieve dataset refs from a graph. diff --git a/python/lsst/pipe/base/cli/cmd/__init__.py b/python/lsst/pipe/base/cli/cmd/__init__.py index 546af6bf9..a2c4c28a9 100644 --- a/python/lsst/pipe/base/cli/cmd/__init__.py +++ b/python/lsst/pipe/base/cli/cmd/__init__.py @@ -25,6 +25,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph"] +__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph", "retrieve_artifacts_for_quanta"] -from .commands import register_instrument, transfer_from_graph, zip_from_graph +from .commands import register_instrument, retrieve_artifacts_for_quanta, transfer_from_graph, zip_from_graph diff --git a/python/lsst/pipe/base/cli/cmd/commands.py b/python/lsst/pipe/base/cli/cmd/commands.py index f87670984..c6858d0e0 100644 --- a/python/lsst/pipe/base/cli/cmd/commands.py +++ b/python/lsst/pipe/base/cli/cmd/commands.py @@ -34,8 +34,9 @@ register_dataset_types_option, repo_argument, transfer_dimensions_option, + transfer_option, ) -from lsst.daf.butler.cli.utils import ButlerCommand +from lsst.daf.butler.cli.utils import ButlerCommand, split_commas, unwrap from ... import script from ..opt import instrument_argument, update_output_chain_option @@ -92,3 +93,61 @@ def zip_from_graph(**kwargs: Any) -> None: """ zip = script.zip_from_graph(**kwargs) print(f"Zip archive written to {zip}") + + +@click.command(short_help="Retrieve artifacts from subset of graph.", cls=ButlerCommand) +@click.argument("graph", required=True) +@repo_argument( + required=True, + help="REPO is a URI to a butler configuration that is used to configure " + "the datastore of the quantum-backed butler.", +) +@click.argument("dest", required=True) +@transfer_option() +@click.option( + "--preserve-path/--no-preserve-path", + is_flag=True, + default=True, + help="Preserve the datastore path to the artifact at the destination.", +) +@click.option( + "--clobber/--no-clobber", + is_flag=True, + default=False, + help="If clobber, overwrite files if they exist locally.", +) +@click.option( + "--qgraph-node-id", + callback=split_commas, + multiple=True, + help=unwrap( + """Only load a specified set of nodes when graph is + loaded from a file, nodes are identified by UUID + values. One or more comma-separated strings are + accepted. By default all nodes are loaded. Ignored if + graph is not loaded from a file.""" + ), +) +@click.option( + "--include-inputs/--no-include-inputs", + is_flag=True, + default=True, + help="Whether to include input datasets in retrieval.", +) +@click.option( + "--include-outputs/--no-include-outputs", + is_flag=True, + default=True, + help="Whether to include outut datasets in retrieval.", +) +@options_file_option() +def retrieve_artifacts_for_quanta(**kwargs: Any) -> None: + """Retrieve artifacts from given quanta defined in quantum graph. + + GRAPH is a URI to the source quantum graph file to use when building the + Zip archive. + + DEST is a directory to write the Zip archive. + """ + artifacts = script.retrieve_artifacts_for_quanta(**kwargs) + print(f"Written {len(artifacts)} artifacts to {kwargs['dest']}.") diff --git a/python/lsst/pipe/base/cli/resources.yaml b/python/lsst/pipe/base/cli/resources.yaml index 9dedebeed..6a166be64 100644 --- a/python/lsst/pipe/base/cli/resources.yaml +++ b/python/lsst/pipe/base/cli/resources.yaml @@ -4,3 +4,4 @@ cmd: - register-instrument - transfer-from-graph - zip-from-graph + - retrieve-artifacts-for-quanta diff --git a/python/lsst/pipe/base/graph/graph.py b/python/lsst/pipe/base/graph/graph.py index 0cce2b3d7..29cdbc97c 100644 --- a/python/lsst/pipe/base/graph/graph.py +++ b/python/lsst/pipe/base/graph/graph.py @@ -44,6 +44,7 @@ from types import MappingProxyType from typing import Any, BinaryIO, TypeVar +import lsst.utils.logging import networkx as nx from lsst.daf.butler import ( Config, @@ -75,6 +76,7 @@ from .quantumNode import BuildId, QuantumNode _T = TypeVar("_T", bound="QuantumGraph") +_LOG = lsst.utils.logging.getLogger(__name__) # modify this constant any time the on disk representation of the save file # changes, and update the load helpers to behave properly for each version. @@ -909,7 +911,7 @@ def loadUri( cls, uri: ResourcePathExpression, universe: DimensionUniverse | None = None, - nodes: Iterable[uuid.UUID] | None = None, + nodes: Iterable[uuid.UUID | str] | None = None, graphID: BuildId | None = None, minimumVersion: int = 3, ) -> QuantumGraph: @@ -924,7 +926,7 @@ def loadUri( saved structure. If supplied, the `~lsst.daf.butler.DimensionUniverse` from the loaded `QuantumGraph` will be validated against the supplied argument for compatibility. - nodes : iterable of `uuid.UUID` or `None` + nodes : iterable of [ `uuid.UUID` | `str` ] or `None` UUIDs that correspond to nodes in the graph. If specified, only these nodes will be loaded. Defaults to None, in which case all nodes will be loaded. @@ -1656,3 +1658,153 @@ def init_output_run(self, butler: LimitedButler, existing: bool = True) -> None: self.write_configs(butler, compare_existing=existing) self.write_packages(butler, compare_existing=existing) self.write_init_outputs(butler, skip_existing=existing) + + def get_refs( + self, + *, + include_init_inputs: bool = False, + include_inputs: bool = False, + include_intermediates: bool | None = None, + include_init_outputs: bool = False, + include_outputs: bool = False, + conform_outputs: bool = True, + ) -> tuple[set[DatasetRef], dict[str, DatastoreRecordData]]: + """Get the requested dataset refs from the graph. + + Parameters + ---------- + include_init_inputs : `bool`, optional + Include init inputs. + include_inputs : `bool`, optional + Include inputs. + include_intermediates : `bool` or `None`, optional + If `None`, no special handling for intermediates is performed. + If `True` intermediates are calculated even if other flags + do not request datasets. If `False` intermediates will be removed + from any results. + include_init_outputs : `bool`, optional + Include init outpus. + include_outputs : `bool`, optional + Include outputs. + conform_outputs : `bool`, optional + Whether any outputs found should have their dataset types conformed + with the registry dataset types. + + Returns + ------- + refs : `set` [ `lsst.daf.butler.DatasetRef` ] + The requested dataset refs found in the graph. + datastore_records : `dict` [ `str`, \ + `lsst.daf.butler.datastore.record_data.DatastoreRecordData` ] + Any datastore records found. + + Notes + ----- + Conforming and requesting inputs and outputs can result in the same + dataset appearing in the results twice with differing storage classes. + """ + datastore_records: dict[str, DatastoreRecordData] = {} + init_input_refs: set[DatasetRef] = set() + init_output_refs: set[DatasetRef] = set(self.globalInitOutputRefs()) + + if include_intermediates is True: + # Need to enable inputs and outputs even if not explicitly + # requested. + request_include_init_inputs = True + request_include_inputs = True + request_include_init_outputs = True + request_include_outputs = True + else: + request_include_init_inputs = include_init_inputs + request_include_inputs = include_inputs + request_include_init_outputs = include_init_outputs + request_include_outputs = include_outputs + + if request_include_init_inputs or request_include_init_outputs: + for task_def in self.iterTaskGraph(): + if request_include_init_inputs: + if in_refs := self.initInputRefs(task_def): + init_input_refs.update(in_refs) + if request_include_init_outputs: + if out_refs := self.initOutputRefs(task_def): + init_output_refs.update(out_refs) + + input_refs: set[DatasetRef] = set() + output_refs: set[DatasetRef] = set() + + for qnode in self: + if request_include_inputs: + for other_refs in qnode.quantum.inputs.values(): + input_refs.update(other_refs) + # Inputs can come with datastore records. + for store_name, records in qnode.quantum.datastore_records.items(): + datastore_records.setdefault(store_name, DatastoreRecordData()).update(records) + if request_include_outputs: + for other_refs in qnode.quantum.outputs.values(): + output_refs.update(other_refs) + + # Intermediates are the intersection of inputs and outputs. Must do + # this analysis before conforming since dataset type changes will + # change set membership. + inter_msg = "" + intermediates = set() + if include_intermediates is not None: + intermediates = (input_refs | init_input_refs) & (output_refs | init_output_refs) + + if include_intermediates is False: + # Remove intermediates from results. + init_input_refs -= intermediates + input_refs -= intermediates + init_output_refs -= intermediates + output_refs -= intermediates + inter_msg = f"; Intermediates removed: {len(intermediates)}" + intermediates = set() + elif include_intermediates is True: + # Do not mention intermediates if all the input/output flags + # would have resulted in them anyhow. + if ( + (request_include_init_inputs is not include_init_inputs) + or (request_include_inputs is not include_inputs) + or (request_include_init_outputs is not include_init_outputs) + or (request_include_outputs is not include_outputs) + ): + inter_msg = f"; including intermediates: {len(intermediates)}" + + # Assign intermediates to the relevant category. + if not include_init_inputs: + init_input_refs &= intermediates + if not include_inputs: + input_refs &= intermediates + if not include_init_outputs: + init_output_refs &= intermediates + if not include_outputs: + output_refs &= intermediates + + # Conforming can result in an input ref and an output ref appearing + # in the returned results that are identical apart from storage class. + if conform_outputs: + # Get data repository definitions from the QuantumGraph; these can + # have different storage classes than those in the quanta. + dataset_types = {dstype.name: dstype for dstype in self.registryDatasetTypes()} + + def _update_ref(ref: DatasetRef) -> DatasetRef: + internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) + if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: + ref = ref.replace(storage_class=internal_dataset_type.storageClass_name) + return ref + + # Convert output_refs to the data repository storage classes, too. + output_refs = {_update_ref(ref) for ref in output_refs} + init_output_refs = {_update_ref(ref) for ref in init_output_refs} + + _LOG.verbose( + "Found the following datasets. InitInputs: %d; Inputs: %d; InitOutputs: %s; Outputs: %d%s", + len(init_input_refs), + len(input_refs), + len(init_output_refs), + len(output_refs), + inter_msg, + ) + + refs = input_refs | init_input_refs | init_output_refs | output_refs + return refs, datastore_records diff --git a/python/lsst/pipe/base/script/__init__.py b/python/lsst/pipe/base/script/__init__.py index f197d49c9..e648c0ec3 100644 --- a/python/lsst/pipe/base/script/__init__.py +++ b/python/lsst/pipe/base/script/__init__.py @@ -26,5 +26,6 @@ # along with this program. If not, see . from .register_instrument import register_instrument +from .retrieve_artifacts_for_quanta import retrieve_artifacts_for_quanta from .transfer_from_graph import transfer_from_graph from .zip_from_graph import zip_from_graph diff --git a/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py b/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py new file mode 100644 index 000000000..fed44c01b --- /dev/null +++ b/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py @@ -0,0 +1,108 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ["retrieve_artifacts_for_quanta"] + +import logging + +from lsst.daf.butler import QuantumBackedButler +from lsst.pipe.base import QuantumGraph +from lsst.resources import ResourcePath + +_LOG = logging.getLogger(__name__) + + +def retrieve_artifacts_for_quanta( + graph: str, + repo: str, + dest: str, + transfer: str, + preserve_path: bool, + clobber: bool, + qgraph_node_id: list[str], + include_inputs: bool, + include_outputs: bool, +) -> list[ResourcePath]: + """Retrieve artifacts referenced in a graph and store locally. + + Parameters + ---------- + graph : `str` + URI string of the quantum graph. + repo : `str` + URI string of the Butler repo to use. + dest : `str` + URI string of the directory to write the artifacts. + transfer : `str` + Transfer mode to use when placing artifacts in the destination. + preserve_path : `bool` + If `True` the full datastore path will be retained within the + destination directory, else only the filename will be used. + clobber : `bool` + If `True` allow transfers to overwrite files at the destination. + qgraph_node_id : `tuple` [ `str` ] + Quanta to extract. + include_inputs : `bool` + Whether to include input datasets in retrieval. + include_outputs : `bool` + Whether to include output datasets in retrieval. + + Returns + ------- + paths : `list` [ `lsst.resources.ResourcePath` ] + The paths to the artifacts that were written. + """ + # Read graph into memory. + nodes = qgraph_node_id or None + qgraph = QuantumGraph.loadUri(graph, nodes=nodes) + + refs, datastore_records = qgraph.get_refs( + include_inputs=include_inputs, + include_init_inputs=include_inputs, + include_outputs=include_outputs, + include_init_outputs=include_outputs, + conform_outputs=True, # Need to look for predicted outputs with correct storage class. + ) + + # Get data repository definitions from the QuantumGraph; these can have + # different storage classes than those in the quanta. + dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} + + # Make QBB, its config is the same as output Butler. + qbb = QuantumBackedButler.from_predicted( + config=repo, + predicted_inputs=[ref.id for ref in refs], + predicted_outputs=[], + dimensions=qgraph.universe, + datastore_records=datastore_records, + dataset_types=dataset_types, + ) + + paths = qbb.retrieve_artifacts( + refs, dest, transfer=transfer, overwrite=clobber, preserve_path=preserve_path + ) + return paths diff --git a/python/lsst/pipe/base/script/transfer_from_graph.py b/python/lsst/pipe/base/script/transfer_from_graph.py index 68b672400..1a3456048 100644 --- a/python/lsst/pipe/base/script/transfer_from_graph.py +++ b/python/lsst/pipe/base/script/transfer_from_graph.py @@ -27,7 +27,7 @@ __all__ = ["transfer_from_graph"] -from lsst.daf.butler import Butler, CollectionType, DatasetRef, QuantumBackedButler, Registry +from lsst.daf.butler import Butler, CollectionType, QuantumBackedButler, Registry from lsst.daf.butler.registry import MissingCollectionError from lsst.pipe.base import QuantumGraph @@ -69,27 +69,10 @@ def transfer_from_graph( # Read whole graph into memory qgraph = QuantumGraph.loadUri(graph) - # Collect output refs that could be created by this graph. - original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs()) - for task_def in qgraph.iterTaskGraph(): - if refs := qgraph.initOutputRefs(task_def): - original_output_refs.update(refs) - for qnode in qgraph: - for otherRefs in qnode.quantum.outputs.values(): - original_output_refs.update(otherRefs) - - # Get data repository definitions from the QuantumGraph; these can have - # different storage classes than those in the quanta. - dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} + output_refs, _ = qgraph.get_refs(include_outputs=True, include_init_outputs=True, conform_outputs=True) - # Convert output_refs to the data repository storage classes, too. - output_refs = set() - for ref in original_output_refs: - internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) - if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: - output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name)) - else: - output_refs.add(ref) + # Get data repository dataset type definitions from the QuantumGraph. + dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} # Make QBB, its config is the same as output Butler. qbb = QuantumBackedButler.from_predicted( diff --git a/python/lsst/pipe/base/script/zip_from_graph.py b/python/lsst/pipe/base/script/zip_from_graph.py index ce36aedac..b5c16f9e5 100644 --- a/python/lsst/pipe/base/script/zip_from_graph.py +++ b/python/lsst/pipe/base/script/zip_from_graph.py @@ -30,7 +30,7 @@ import logging import re -from lsst.daf.butler import DatasetRef, QuantumBackedButler +from lsst.daf.butler import QuantumBackedButler from lsst.daf.butler.utils import globToRegex from lsst.pipe.base import QuantumGraph from lsst.resources import ResourcePath @@ -67,28 +67,11 @@ def zip_from_graph( # Read whole graph into memory qgraph = QuantumGraph.loadUri(graph) - # Collect output refs that could be created by this graph. - original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs()) - for task_def in qgraph.iterTaskGraph(): - if refs := qgraph.initOutputRefs(task_def): - original_output_refs.update(refs) - for qnode in qgraph: - for otherRefs in qnode.quantum.outputs.values(): - original_output_refs.update(otherRefs) + output_refs, _ = qgraph.get_refs(include_outputs=True, include_init_outputs=True, conform_outputs=True) - # Get data repository definitions from the QuantumGraph; these can have - # different storage classes than those in the quanta. + # Get data repository dataset type definitions from the QuantumGraph. dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} - # Convert output_refs to the data repository storage classes, too. - output_refs = set() - for ref in original_output_refs: - internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) - if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: - output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name)) - else: - output_refs.add(ref) - # Make QBB, its config is the same as output Butler. qbb = QuantumBackedButler.from_predicted( config=repo, diff --git a/tests/test_quantumGraph.py b/tests/test_quantumGraph.py index 84c1d7636..f495ac284 100644 --- a/tests/test_quantumGraph.py +++ b/tests/test_quantumGraph.py @@ -617,6 +617,29 @@ def testGetSummary(self) -> None: self.assertEqual(self.qGraph.graphID, summary.graphID) self.assertEqual(len(summary.qgraphTaskSummaries), len(self.qGraph.taskGraph)) + def test_get_refs(self) -> None: + """Test that dataset refs can be retrieved from graph.""" + refs, _ = self.qGraph.get_refs(include_inputs=True) + self.assertEqual(len(refs), 8, str(refs)) + refs, _ = self.qGraph.get_refs(include_init_inputs=True) + self.assertEqual(len(refs), 2, str(refs)) + refs, _ = self.qGraph.get_refs(include_init_outputs=True) + self.assertEqual(len(refs), 4, str(refs)) + refs, _ = self.qGraph.get_refs(include_outputs=True) + self.assertEqual(len(refs), 8, str(refs)) + refs, _ = self.qGraph.get_refs(include_inputs=True, include_outputs=True) + self.assertEqual(len(refs), 12, str(refs)) + refs, _ = self.qGraph.get_refs( + include_inputs=True, include_outputs=True, include_init_inputs=True, include_init_outputs=True + ) + self.assertEqual(len(refs), 16, str(refs)) + refs, _ = self.qGraph.get_refs(include_intermediates=True) + self.assertEqual(len(refs), 6, str(refs)) + refs, _ = self.qGraph.get_refs(include_intermediates=False) + self.assertEqual(len(refs), 0, str(refs)) + refs, _ = self.qGraph.get_refs(include_intermediates=False, include_inputs=True, include_outputs=True) + self.assertEqual(len(refs), 8, str(refs)) + class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase): """Run file leak tests."""