diff --git a/python/lsst/pipe/base/cli/cmd/__init__.py b/python/lsst/pipe/base/cli/cmd/__init__.py index ae7c25112..d294b26ca 100644 --- a/python/lsst/pipe/base/cli/cmd/__init__.py +++ b/python/lsst/pipe/base/cli/cmd/__init__.py @@ -27,4 +27,4 @@ __all__ = ["register_instrument", "transfer_from_graph"] -from .commands import register_instrument, transfer_from_graph +from .commands import register_instrument, transfer_from_graph, zip_from_graph diff --git a/python/lsst/pipe/base/cli/cmd/commands.py b/python/lsst/pipe/base/cli/cmd/commands.py index 2db7fee13..1c04bbcdf 100644 --- a/python/lsst/pipe/base/cli/cmd/commands.py +++ b/python/lsst/pipe/base/cli/cmd/commands.py @@ -29,6 +29,7 @@ import click from lsst.daf.butler.cli.opt import ( + dataset_type_option, options_file_option, register_dataset_types_option, repo_argument, @@ -62,10 +63,31 @@ def register_instrument(*args: Any, **kwargs: Any) -> None: def transfer_from_graph(**kwargs: Any) -> None: """Transfer datasets from a quantum graph to a destination butler. - SOURCE is a URI to the Butler repository containing the RUN dataset. + SOURCE is a URI to the source quantum graph file. DEST is a URI to the Butler repository that will receive copies of the datasets. """ number = script.transfer_from_graph(**kwargs) print(f"Number of datasets transferred: {number}") + + +@click.command(short_help="Make Zip archive from output files using graph.", cls=ButlerCommand) +@click.argument("graph", required=True) +@repo_argument(required=True) +@click.argument("dest", required=True) +@dataset_type_option(help="Dataset types to include in Zip archive.") +@options_file_option() +def zip_from_graph(**kwargs: Any) -> None: + """Transfer datasets from a quantum graph to a destination butler. + + SOURCE is a URI to the source quantum graph file to use when building the + Zip archive. + + REPO is a URI to a butler configuration that is used to configure the + datastore of the quantum-backed butler. + + DEST is a directory to write the Zip archive. + """ + zip = script.zip_from_graph(**kwargs) + print(f"Zip archive written to {zip}") diff --git a/python/lsst/pipe/base/cli/resources.yaml b/python/lsst/pipe/base/cli/resources.yaml index 737276ab2..9dedebeed 100644 --- a/python/lsst/pipe/base/cli/resources.yaml +++ b/python/lsst/pipe/base/cli/resources.yaml @@ -3,3 +3,4 @@ cmd: commands: - register-instrument - transfer-from-graph + - zip-from-graph diff --git a/python/lsst/pipe/base/script/__init__.py b/python/lsst/pipe/base/script/__init__.py index 29253c946..f197d49c9 100644 --- a/python/lsst/pipe/base/script/__init__.py +++ b/python/lsst/pipe/base/script/__init__.py @@ -27,3 +27,4 @@ from .register_instrument import register_instrument from .transfer_from_graph import transfer_from_graph +from .zip_from_graph import zip_from_graph diff --git a/python/lsst/pipe/base/script/zip_from_graph.py b/python/lsst/pipe/base/script/zip_from_graph.py new file mode 100644 index 000000000..ce36aedac --- /dev/null +++ b/python/lsst/pipe/base/script/zip_from_graph.py @@ -0,0 +1,121 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ["zip_from_graph"] + +import logging +import re + +from lsst.daf.butler import DatasetRef, QuantumBackedButler +from lsst.daf.butler.utils import globToRegex +from lsst.pipe.base import QuantumGraph +from lsst.resources import ResourcePath + +_LOG = logging.getLogger(__name__) + + +def zip_from_graph( + graph: str, + repo: str, + dest: str, + dataset_type: tuple[str, ...], +) -> ResourcePath: + """Create Zip export file from graph outputs. + + Parameters + ---------- + graph : `str` + URI string of the quantum graph. + repo : `str` + URI to a butler configuration used to define the datastore associated + with the graph. + dest : `str` + Path to the destination directory for the Zip file. + dataset_type : `tuple` of `str` + Dataset type names. An empty tuple implies all dataset types. + Can include globs. + + Returns + ------- + zip_path : `lsst.resources.ResourcePath` + Path to the Zip file. + """ + # Read whole graph into memory + qgraph = QuantumGraph.loadUri(graph) + + # Collect output refs that could be created by this graph. + original_output_refs: set[DatasetRef] = set(qgraph.globalInitOutputRefs()) + for task_def in qgraph.iterTaskGraph(): + if refs := qgraph.initOutputRefs(task_def): + original_output_refs.update(refs) + for qnode in qgraph: + for otherRefs in qnode.quantum.outputs.values(): + original_output_refs.update(otherRefs) + + # Get data repository definitions from the QuantumGraph; these can have + # different storage classes than those in the quanta. + dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} + + # Convert output_refs to the data repository storage classes, too. + output_refs = set() + for ref in original_output_refs: + internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType) + if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name: + output_refs.add(ref.overrideStorageClass(internal_dataset_type.storageClass_name)) + else: + output_refs.add(ref) + + # Make QBB, its config is the same as output Butler. + qbb = QuantumBackedButler.from_predicted( + config=repo, + predicted_inputs=[ref.id for ref in output_refs], + predicted_outputs=[], + dimensions=qgraph.universe, + datastore_records={}, + dataset_types=dataset_types, + ) + + # Filter the refs based on requested dataset types. + regexes = globToRegex(dataset_type) + if regexes is ...: + filtered_refs = output_refs + else: + + def _matches(dataset_type_name: str, regexes: list[str | re.Pattern]) -> bool: + for regex in regexes: + if isinstance(regex, str): + if dataset_type_name == regex: + return True + elif regex.search(dataset_type_name): + return True + return False + + filtered_refs = {ref for ref in output_refs if _matches(ref.datasetType.name, regexes)} + + _LOG.info("Retrieving artifacts for %d datasets and storing in Zip file.", len(output_refs)) + zip = qbb.retrieve_artifacts_zip(filtered_refs, dest) + return zip