diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 0d5fda462..c2d6f0916 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -59,6 +59,8 @@ jobs:
run: |
pytest -r a -v -n 3 --cov=lsst.pipe.base --cov=tests --cov-report=xml --cov-report=term --cov-branch
butler register-instrument -h
+ butler transfer-from-graph -h
+ butler retrieve-artifacts-for-quanta -h
- name: Upload coverage to codecov
uses: codecov/codecov-action@v4
with:
diff --git a/doc/changes/DM-47328.feature.rst b/doc/changes/DM-47328.feature.rst
new file mode 100644
index 000000000..3ad2fc65a
--- /dev/null
+++ b/doc/changes/DM-47328.feature.rst
@@ -0,0 +1,2 @@
+* Added new command-line ``butler retrieve-artifacts-for-quanta`` which can be used to retrieve input or output datasets associated with a graph or specific quanta.
+* Added new ``QuantumGraph.get_refs()`` method to retrieve dataset refs from a graph.
diff --git a/python/lsst/pipe/base/cli/cmd/__init__.py b/python/lsst/pipe/base/cli/cmd/__init__.py
index 546af6bf9..a2c4c28a9 100644
--- a/python/lsst/pipe/base/cli/cmd/__init__.py
+++ b/python/lsst/pipe/base/cli/cmd/__init__.py
@@ -25,6 +25,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph"]
+__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph", "retrieve_artifacts_for_quanta"]
-from .commands import register_instrument, transfer_from_graph, zip_from_graph
+from .commands import register_instrument, retrieve_artifacts_for_quanta, transfer_from_graph, zip_from_graph
diff --git a/python/lsst/pipe/base/cli/cmd/commands.py b/python/lsst/pipe/base/cli/cmd/commands.py
index f87670984..c6858d0e0 100644
--- a/python/lsst/pipe/base/cli/cmd/commands.py
+++ b/python/lsst/pipe/base/cli/cmd/commands.py
@@ -34,8 +34,9 @@
register_dataset_types_option,
repo_argument,
transfer_dimensions_option,
+ transfer_option,
)
-from lsst.daf.butler.cli.utils import ButlerCommand
+from lsst.daf.butler.cli.utils import ButlerCommand, split_commas, unwrap
from ... import script
from ..opt import instrument_argument, update_output_chain_option
@@ -92,3 +93,61 @@ def zip_from_graph(**kwargs: Any) -> None:
"""
zip = script.zip_from_graph(**kwargs)
print(f"Zip archive written to {zip}")
+
+
+@click.command(short_help="Retrieve artifacts from subset of graph.", cls=ButlerCommand)
+@click.argument("graph", required=True)
+@repo_argument(
+ required=True,
+ help="REPO is a URI to a butler configuration that is used to configure "
+ "the datastore of the quantum-backed butler.",
+)
+@click.argument("dest", required=True)
+@transfer_option()
+@click.option(
+ "--preserve-path/--no-preserve-path",
+ is_flag=True,
+ default=True,
+ help="Preserve the datastore path to the artifact at the destination.",
+)
+@click.option(
+ "--clobber/--no-clobber",
+ is_flag=True,
+ default=False,
+ help="If clobber, overwrite files if they exist locally.",
+)
+@click.option(
+ "--qgraph-node-id",
+ callback=split_commas,
+ multiple=True,
+ help=unwrap(
+ """Only load a specified set of nodes when graph is
+ loaded from a file, nodes are identified by UUID
+ values. One or more comma-separated strings are
+ accepted. By default all nodes are loaded. Ignored if
+ graph is not loaded from a file."""
+ ),
+)
+@click.option(
+ "--include-inputs/--no-include-inputs",
+ is_flag=True,
+ default=True,
+ help="Whether to include input datasets in retrieval.",
+)
+@click.option(
+ "--include-outputs/--no-include-outputs",
+ is_flag=True,
+ default=True,
+ help="Whether to include outut datasets in retrieval.",
+)
+@options_file_option()
+def retrieve_artifacts_for_quanta(**kwargs: Any) -> None:
+ """Retrieve artifacts from given quanta defined in quantum graph.
+
+ GRAPH is a URI to the source quantum graph file to use when building the
+ Zip archive.
+
+ DEST is a directory to write the Zip archive.
+ """
+ artifacts = script.retrieve_artifacts_for_quanta(**kwargs)
+ print(f"Written {len(artifacts)} artifacts to {kwargs['dest']}.")
diff --git a/python/lsst/pipe/base/cli/resources.yaml b/python/lsst/pipe/base/cli/resources.yaml
index 9dedebeed..6a166be64 100644
--- a/python/lsst/pipe/base/cli/resources.yaml
+++ b/python/lsst/pipe/base/cli/resources.yaml
@@ -4,3 +4,4 @@ cmd:
- register-instrument
- transfer-from-graph
- zip-from-graph
+ - retrieve-artifacts-for-quanta
diff --git a/python/lsst/pipe/base/graph/graph.py b/python/lsst/pipe/base/graph/graph.py
index 0cce2b3d7..29cdbc97c 100644
--- a/python/lsst/pipe/base/graph/graph.py
+++ b/python/lsst/pipe/base/graph/graph.py
@@ -44,6 +44,7 @@
from types import MappingProxyType
from typing import Any, BinaryIO, TypeVar
+import lsst.utils.logging
import networkx as nx
from lsst.daf.butler import (
Config,
@@ -75,6 +76,7 @@
from .quantumNode import BuildId, QuantumNode
_T = TypeVar("_T", bound="QuantumGraph")
+_LOG = lsst.utils.logging.getLogger(__name__)
# modify this constant any time the on disk representation of the save file
# changes, and update the load helpers to behave properly for each version.
@@ -909,7 +911,7 @@ def loadUri(
cls,
uri: ResourcePathExpression,
universe: DimensionUniverse | None = None,
- nodes: Iterable[uuid.UUID] | None = None,
+ nodes: Iterable[uuid.UUID | str] | None = None,
graphID: BuildId | None = None,
minimumVersion: int = 3,
) -> QuantumGraph:
@@ -924,7 +926,7 @@ def loadUri(
saved structure. If supplied, the
`~lsst.daf.butler.DimensionUniverse` from the loaded `QuantumGraph`
will be validated against the supplied argument for compatibility.
- nodes : iterable of `uuid.UUID` or `None`
+ nodes : iterable of [ `uuid.UUID` | `str` ] or `None`
UUIDs that correspond to nodes in the graph. If specified, only
these nodes will be loaded. Defaults to None, in which case all
nodes will be loaded.
@@ -1656,3 +1658,153 @@ def init_output_run(self, butler: LimitedButler, existing: bool = True) -> None:
self.write_configs(butler, compare_existing=existing)
self.write_packages(butler, compare_existing=existing)
self.write_init_outputs(butler, skip_existing=existing)
+
+ def get_refs(
+ self,
+ *,
+ include_init_inputs: bool = False,
+ include_inputs: bool = False,
+ include_intermediates: bool | None = None,
+ include_init_outputs: bool = False,
+ include_outputs: bool = False,
+ conform_outputs: bool = True,
+ ) -> tuple[set[DatasetRef], dict[str, DatastoreRecordData]]:
+ """Get the requested dataset refs from the graph.
+
+ Parameters
+ ----------
+ include_init_inputs : `bool`, optional
+ Include init inputs.
+ include_inputs : `bool`, optional
+ Include inputs.
+ include_intermediates : `bool` or `None`, optional
+ If `None`, no special handling for intermediates is performed.
+ If `True` intermediates are calculated even if other flags
+ do not request datasets. If `False` intermediates will be removed
+ from any results.
+ include_init_outputs : `bool`, optional
+ Include init outpus.
+ include_outputs : `bool`, optional
+ Include outputs.
+ conform_outputs : `bool`, optional
+ Whether any outputs found should have their dataset types conformed
+ with the registry dataset types.
+
+ Returns
+ -------
+ refs : `set` [ `lsst.daf.butler.DatasetRef` ]
+ The requested dataset refs found in the graph.
+ datastore_records : `dict` [ `str`, \
+ `lsst.daf.butler.datastore.record_data.DatastoreRecordData` ]
+ Any datastore records found.
+
+ Notes
+ -----
+ Conforming and requesting inputs and outputs can result in the same
+ dataset appearing in the results twice with differing storage classes.
+ """
+ datastore_records: dict[str, DatastoreRecordData] = {}
+ init_input_refs: set[DatasetRef] = set()
+ init_output_refs: set[DatasetRef] = set(self.globalInitOutputRefs())
+
+ if include_intermediates is True:
+ # Need to enable inputs and outputs even if not explicitly
+ # requested.
+ request_include_init_inputs = True
+ request_include_inputs = True
+ request_include_init_outputs = True
+ request_include_outputs = True
+ else:
+ request_include_init_inputs = include_init_inputs
+ request_include_inputs = include_inputs
+ request_include_init_outputs = include_init_outputs
+ request_include_outputs = include_outputs
+
+ if request_include_init_inputs or request_include_init_outputs:
+ for task_def in self.iterTaskGraph():
+ if request_include_init_inputs:
+ if in_refs := self.initInputRefs(task_def):
+ init_input_refs.update(in_refs)
+ if request_include_init_outputs:
+ if out_refs := self.initOutputRefs(task_def):
+ init_output_refs.update(out_refs)
+
+ input_refs: set[DatasetRef] = set()
+ output_refs: set[DatasetRef] = set()
+
+ for qnode in self:
+ if request_include_inputs:
+ for other_refs in qnode.quantum.inputs.values():
+ input_refs.update(other_refs)
+ # Inputs can come with datastore records.
+ for store_name, records in qnode.quantum.datastore_records.items():
+ datastore_records.setdefault(store_name, DatastoreRecordData()).update(records)
+ if request_include_outputs:
+ for other_refs in qnode.quantum.outputs.values():
+ output_refs.update(other_refs)
+
+ # Intermediates are the intersection of inputs and outputs. Must do
+ # this analysis before conforming since dataset type changes will
+ # change set membership.
+ inter_msg = ""
+ intermediates = set()
+ if include_intermediates is not None:
+ intermediates = (input_refs | init_input_refs) & (output_refs | init_output_refs)
+
+ if include_intermediates is False:
+ # Remove intermediates from results.
+ init_input_refs -= intermediates
+ input_refs -= intermediates
+ init_output_refs -= intermediates
+ output_refs -= intermediates
+ inter_msg = f"; Intermediates removed: {len(intermediates)}"
+ intermediates = set()
+ elif include_intermediates is True:
+ # Do not mention intermediates if all the input/output flags
+ # would have resulted in them anyhow.
+ if (
+ (request_include_init_inputs is not include_init_inputs)
+ or (request_include_inputs is not include_inputs)
+ or (request_include_init_outputs is not include_init_outputs)
+ or (request_include_outputs is not include_outputs)
+ ):
+ inter_msg = f"; including intermediates: {len(intermediates)}"
+
+ # Assign intermediates to the relevant category.
+ if not include_init_inputs:
+ init_input_refs &= intermediates
+ if not include_inputs:
+ input_refs &= intermediates
+ if not include_init_outputs:
+ init_output_refs &= intermediates
+ if not include_outputs:
+ output_refs &= intermediates
+
+ # Conforming can result in an input ref and an output ref appearing
+ # in the returned results that are identical apart from storage class.
+ if conform_outputs:
+ # Get data repository definitions from the QuantumGraph; these can
+ # have different storage classes than those in the quanta.
+ dataset_types = {dstype.name: dstype for dstype in self.registryDatasetTypes()}
+
+ def _update_ref(ref: DatasetRef) -> DatasetRef:
+ internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType)
+ if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name:
+ ref = ref.replace(storage_class=internal_dataset_type.storageClass_name)
+ return ref
+
+ # Convert output_refs to the data repository storage classes, too.
+ output_refs = {_update_ref(ref) for ref in output_refs}
+ init_output_refs = {_update_ref(ref) for ref in init_output_refs}
+
+ _LOG.verbose(
+ "Found the following datasets. InitInputs: %d; Inputs: %d; InitOutputs: %s; Outputs: %d%s",
+ len(init_input_refs),
+ len(input_refs),
+ len(init_output_refs),
+ len(output_refs),
+ inter_msg,
+ )
+
+ refs = input_refs | init_input_refs | init_output_refs | output_refs
+ return refs, datastore_records
diff --git a/python/lsst/pipe/base/script/__init__.py b/python/lsst/pipe/base/script/__init__.py
index f197d49c9..e648c0ec3 100644
--- a/python/lsst/pipe/base/script/__init__.py
+++ b/python/lsst/pipe/base/script/__init__.py
@@ -26,5 +26,6 @@
# along with this program. If not, see .
from .register_instrument import register_instrument
+from .retrieve_artifacts_for_quanta import retrieve_artifacts_for_quanta
from .transfer_from_graph import transfer_from_graph
from .zip_from_graph import zip_from_graph
diff --git a/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py b/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py
new file mode 100644
index 000000000..fed44c01b
--- /dev/null
+++ b/python/lsst/pipe/base/script/retrieve_artifacts_for_quanta.py
@@ -0,0 +1,108 @@
+# This file is part of pipe_base.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This software is dual licensed under the GNU General Public License and also
+# under a 3-clause BSD license. Recipients may choose which of these licenses
+# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
+# respectively. If you choose the GPL option then the following text applies
+# (but note that there is still no warranty even if you opt for BSD instead):
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+__all__ = ["retrieve_artifacts_for_quanta"]
+
+import logging
+
+from lsst.daf.butler import QuantumBackedButler
+from lsst.pipe.base import QuantumGraph
+from lsst.resources import ResourcePath
+
+_LOG = logging.getLogger(__name__)
+
+
+def retrieve_artifacts_for_quanta(
+ graph: str,
+ repo: str,
+ dest: str,
+ transfer: str,
+ preserve_path: bool,
+ clobber: bool,
+ qgraph_node_id: list[str],
+ include_inputs: bool,
+ include_outputs: bool,
+) -> list[ResourcePath]:
+ """Retrieve artifacts referenced in a graph and store locally.
+
+ Parameters
+ ----------
+ graph : `str`
+ URI string of the quantum graph.
+ repo : `str`
+ URI string of the Butler repo to use.
+ dest : `str`
+ URI string of the directory to write the artifacts.
+ transfer : `str`
+ Transfer mode to use when placing artifacts in the destination.
+ preserve_path : `bool`
+ If `True` the full datastore path will be retained within the
+ destination directory, else only the filename will be used.
+ clobber : `bool`
+ If `True` allow transfers to overwrite files at the destination.
+ qgraph_node_id : `tuple` [ `str` ]
+ Quanta to extract.
+ include_inputs : `bool`
+ Whether to include input datasets in retrieval.
+ include_outputs : `bool`
+ Whether to include output datasets in retrieval.
+
+ Returns
+ -------
+ paths : `list` [ `lsst.resources.ResourcePath` ]
+ The paths to the artifacts that were written.
+ """
+ # Read graph into memory.
+ nodes = qgraph_node_id or None
+ qgraph = QuantumGraph.loadUri(graph, nodes=nodes)
+
+ refs, datastore_records = qgraph.get_refs(
+ include_inputs=include_inputs,
+ include_init_inputs=include_inputs,
+ include_outputs=include_outputs,
+ include_init_outputs=include_outputs,
+ conform_outputs=True, # Need to look for predicted outputs with correct storage class.
+ )
+
+ # Get data repository definitions from the QuantumGraph; these can have
+ # different storage classes than those in the quanta.
+ dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()}
+
+ # Make QBB, its config is the same as output Butler.
+ qbb = QuantumBackedButler.from_predicted(
+ config=repo,
+ predicted_inputs=[ref.id for ref in refs],
+ predicted_outputs=[],
+ dimensions=qgraph.universe,
+ datastore_records=datastore_records,
+ dataset_types=dataset_types,
+ )
+
+ paths = qbb.retrieve_artifacts(
+ refs, dest, transfer=transfer, overwrite=clobber, preserve_path=preserve_path
+ )
+ return paths
diff --git a/python/lsst/pipe/base/script/transfer_from_graph.py b/python/lsst/pipe/base/script/transfer_from_graph.py
index 68b672400..1a3456048 100644
--- a/python/lsst/pipe/base/script/transfer_from_graph.py
+++ b/python/lsst/pipe/base/script/transfer_from_graph.py
@@ -27,7 +27,7 @@
__all__ = ["transfer_from_graph"]
-from lsst.daf.butler import Butler, CollectionType, DatasetRef, QuantumBackedButler, Registry
+from lsst.daf.butler import Butler, CollectionType, QuantumBackedButler, Registry
from lsst.daf.butler.registry import MissingCollectionError
from lsst.pipe.base import QuantumGraph
@@ -69,27 +69,10 @@ def transfer_from_graph(
# Read whole graph into memory
qgraph = QuantumGraph.loadUri(graph)
- # Collect output refs that could be created by this graph.
- original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs())
- for task_def in qgraph.iterTaskGraph():
- if refs := qgraph.initOutputRefs(task_def):
- original_output_refs.update(refs)
- for qnode in qgraph:
- for otherRefs in qnode.quantum.outputs.values():
- original_output_refs.update(otherRefs)
-
- # Get data repository definitions from the QuantumGraph; these can have
- # different storage classes than those in the quanta.
- dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()}
+ output_refs, _ = qgraph.get_refs(include_outputs=True, include_init_outputs=True, conform_outputs=True)
- # Convert output_refs to the data repository storage classes, too.
- output_refs = set()
- for ref in original_output_refs:
- internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType)
- if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name:
- output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name))
- else:
- output_refs.add(ref)
+ # Get data repository dataset type definitions from the QuantumGraph.
+ dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()}
# Make QBB, its config is the same as output Butler.
qbb = QuantumBackedButler.from_predicted(
diff --git a/python/lsst/pipe/base/script/zip_from_graph.py b/python/lsst/pipe/base/script/zip_from_graph.py
index ce36aedac..b5c16f9e5 100644
--- a/python/lsst/pipe/base/script/zip_from_graph.py
+++ b/python/lsst/pipe/base/script/zip_from_graph.py
@@ -30,7 +30,7 @@
import logging
import re
-from lsst.daf.butler import DatasetRef, QuantumBackedButler
+from lsst.daf.butler import QuantumBackedButler
from lsst.daf.butler.utils import globToRegex
from lsst.pipe.base import QuantumGraph
from lsst.resources import ResourcePath
@@ -67,28 +67,11 @@ def zip_from_graph(
# Read whole graph into memory
qgraph = QuantumGraph.loadUri(graph)
- # Collect output refs that could be created by this graph.
- original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs())
- for task_def in qgraph.iterTaskGraph():
- if refs := qgraph.initOutputRefs(task_def):
- original_output_refs.update(refs)
- for qnode in qgraph:
- for otherRefs in qnode.quantum.outputs.values():
- original_output_refs.update(otherRefs)
+ output_refs, _ = qgraph.get_refs(include_outputs=True, include_init_outputs=True, conform_outputs=True)
- # Get data repository definitions from the QuantumGraph; these can have
- # different storage classes than those in the quanta.
+ # Get data repository dataset type definitions from the QuantumGraph.
dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()}
- # Convert output_refs to the data repository storage classes, too.
- output_refs = set()
- for ref in original_output_refs:
- internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType)
- if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name:
- output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name))
- else:
- output_refs.add(ref)
-
# Make QBB, its config is the same as output Butler.
qbb = QuantumBackedButler.from_predicted(
config=repo,
diff --git a/tests/test_quantumGraph.py b/tests/test_quantumGraph.py
index 84c1d7636..f495ac284 100644
--- a/tests/test_quantumGraph.py
+++ b/tests/test_quantumGraph.py
@@ -617,6 +617,29 @@ def testGetSummary(self) -> None:
self.assertEqual(self.qGraph.graphID, summary.graphID)
self.assertEqual(len(summary.qgraphTaskSummaries), len(self.qGraph.taskGraph))
+ def test_get_refs(self) -> None:
+ """Test that dataset refs can be retrieved from graph."""
+ refs, _ = self.qGraph.get_refs(include_inputs=True)
+ self.assertEqual(len(refs), 8, str(refs))
+ refs, _ = self.qGraph.get_refs(include_init_inputs=True)
+ self.assertEqual(len(refs), 2, str(refs))
+ refs, _ = self.qGraph.get_refs(include_init_outputs=True)
+ self.assertEqual(len(refs), 4, str(refs))
+ refs, _ = self.qGraph.get_refs(include_outputs=True)
+ self.assertEqual(len(refs), 8, str(refs))
+ refs, _ = self.qGraph.get_refs(include_inputs=True, include_outputs=True)
+ self.assertEqual(len(refs), 12, str(refs))
+ refs, _ = self.qGraph.get_refs(
+ include_inputs=True, include_outputs=True, include_init_inputs=True, include_init_outputs=True
+ )
+ self.assertEqual(len(refs), 16, str(refs))
+ refs, _ = self.qGraph.get_refs(include_intermediates=True)
+ self.assertEqual(len(refs), 6, str(refs))
+ refs, _ = self.qGraph.get_refs(include_intermediates=False)
+ self.assertEqual(len(refs), 0, str(refs))
+ refs, _ = self.qGraph.get_refs(include_intermediates=False, include_inputs=True, include_outputs=True)
+ self.assertEqual(len(refs), 8, str(refs))
+
class MyMemoryTestCase(lsst.utils.tests.MemoryTestCase):
"""Run file leak tests."""