Skip to content

Commit

Permalink
Merge pull request #454 from lsst/tickets/DM-47328
Browse files Browse the repository at this point in the history
DM-47328: Experimental command to copy files based on a graph
  • Loading branch information
timj authored Nov 5, 2024
2 parents d7cbcf4 + 9465cbd commit 591c0c7
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 46 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ jobs:
run: |
pytest -r a -v -n 3 --cov=lsst.pipe.base --cov=tests --cov-report=xml --cov-report=term --cov-branch
butler register-instrument -h
butler transfer-from-graph -h
butler retrieve-artifacts-for-quanta -h
- name: Upload coverage to codecov
uses: codecov/codecov-action@v4
with:
Expand Down
2 changes: 2 additions & 0 deletions doc/changes/DM-47328.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Added new command-line ``butler retrieve-artifacts-for-quanta`` which can be used to retrieve input or output datasets associated with a graph or specific quanta.
* Added new ``QuantumGraph.get_refs()`` method to retrieve dataset refs from a graph.
4 changes: 2 additions & 2 deletions python/lsst/pipe/base/cli/cmd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph"]
__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph", "retrieve_artifacts_for_quanta"]

from .commands import register_instrument, transfer_from_graph, zip_from_graph
from .commands import register_instrument, retrieve_artifacts_for_quanta, transfer_from_graph, zip_from_graph
61 changes: 60 additions & 1 deletion python/lsst/pipe/base/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
register_dataset_types_option,
repo_argument,
transfer_dimensions_option,
transfer_option,
)
from lsst.daf.butler.cli.utils import ButlerCommand
from lsst.daf.butler.cli.utils import ButlerCommand, split_commas, unwrap

from ... import script
from ..opt import instrument_argument, update_output_chain_option
Expand Down Expand Up @@ -92,3 +93,61 @@ def zip_from_graph(**kwargs: Any) -> None:
"""
zip = script.zip_from_graph(**kwargs)
print(f"Zip archive written to {zip}")


@click.command(short_help="Retrieve artifacts from subset of graph.", cls=ButlerCommand)
@click.argument("graph", required=True)
@repo_argument(
required=True,
help="REPO is a URI to a butler configuration that is used to configure "
"the datastore of the quantum-backed butler.",
)
@click.argument("dest", required=True)
@transfer_option()
@click.option(
"--preserve-path/--no-preserve-path",
is_flag=True,
default=True,
help="Preserve the datastore path to the artifact at the destination.",
)
@click.option(
"--clobber/--no-clobber",
is_flag=True,
default=False,
help="If clobber, overwrite files if they exist locally.",
)
@click.option(
"--qgraph-node-id",
callback=split_commas,
multiple=True,
help=unwrap(
"""Only load a specified set of nodes when graph is
loaded from a file, nodes are identified by UUID
values. One or more comma-separated strings are
accepted. By default all nodes are loaded. Ignored if
graph is not loaded from a file."""
),
)
@click.option(
"--include-inputs/--no-include-inputs",
is_flag=True,
default=True,
help="Whether to include input datasets in retrieval.",
)
@click.option(
"--include-outputs/--no-include-outputs",
is_flag=True,
default=True,
help="Whether to include outut datasets in retrieval.",
)
@options_file_option()
def retrieve_artifacts_for_quanta(**kwargs: Any) -> None:
"""Retrieve artifacts from given quanta defined in quantum graph.
GRAPH is a URI to the source quantum graph file to use when building the
Zip archive.
DEST is a directory to write the Zip archive.
"""
artifacts = script.retrieve_artifacts_for_quanta(**kwargs)
print(f"Written {len(artifacts)} artifacts to {kwargs['dest']}.")
1 change: 1 addition & 0 deletions python/lsst/pipe/base/cli/resources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ cmd:
- register-instrument
- transfer-from-graph
- zip-from-graph
- retrieve-artifacts-for-quanta
156 changes: 154 additions & 2 deletions python/lsst/pipe/base/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from types import MappingProxyType
from typing import Any, BinaryIO, TypeVar

import lsst.utils.logging
import networkx as nx
from lsst.daf.butler import (
Config,
Expand Down Expand Up @@ -75,6 +76,7 @@
from .quantumNode import BuildId, QuantumNode

_T = TypeVar("_T", bound="QuantumGraph")
_LOG = lsst.utils.logging.getLogger(__name__)

# modify this constant any time the on disk representation of the save file
# changes, and update the load helpers to behave properly for each version.
Expand Down Expand Up @@ -909,7 +911,7 @@ def loadUri(
cls,
uri: ResourcePathExpression,
universe: DimensionUniverse | None = None,
nodes: Iterable[uuid.UUID] | None = None,
nodes: Iterable[uuid.UUID | str] | None = None,
graphID: BuildId | None = None,
minimumVersion: int = 3,
) -> QuantumGraph:
Expand All @@ -924,7 +926,7 @@ def loadUri(
saved structure. If supplied, the
`~lsst.daf.butler.DimensionUniverse` from the loaded `QuantumGraph`
will be validated against the supplied argument for compatibility.
nodes : iterable of `uuid.UUID` or `None`
nodes : iterable of [ `uuid.UUID` | `str` ] or `None`
UUIDs that correspond to nodes in the graph. If specified, only
these nodes will be loaded. Defaults to None, in which case all
nodes will be loaded.
Expand Down Expand Up @@ -1656,3 +1658,153 @@ def init_output_run(self, butler: LimitedButler, existing: bool = True) -> None:
self.write_configs(butler, compare_existing=existing)
self.write_packages(butler, compare_existing=existing)
self.write_init_outputs(butler, skip_existing=existing)

def get_refs(
self,
*,
include_init_inputs: bool = False,
include_inputs: bool = False,
include_intermediates: bool | None = None,
include_init_outputs: bool = False,
include_outputs: bool = False,
conform_outputs: bool = True,
) -> tuple[set[DatasetRef], dict[str, DatastoreRecordData]]:
"""Get the requested dataset refs from the graph.
Parameters
----------
include_init_inputs : `bool`, optional
Include init inputs.
include_inputs : `bool`, optional
Include inputs.
include_intermediates : `bool` or `None`, optional
If `None`, no special handling for intermediates is performed.
If `True` intermediates are calculated even if other flags
do not request datasets. If `False` intermediates will be removed
from any results.
include_init_outputs : `bool`, optional
Include init outpus.
include_outputs : `bool`, optional
Include outputs.
conform_outputs : `bool`, optional
Whether any outputs found should have their dataset types conformed
with the registry dataset types.
Returns
-------
refs : `set` [ `lsst.daf.butler.DatasetRef` ]
The requested dataset refs found in the graph.
datastore_records : `dict` [ `str`, \
`lsst.daf.butler.datastore.record_data.DatastoreRecordData` ]
Any datastore records found.
Notes
-----
Conforming and requesting inputs and outputs can result in the same
dataset appearing in the results twice with differing storage classes.
"""
datastore_records: dict[str, DatastoreRecordData] = {}
init_input_refs: set[DatasetRef] = set()
init_output_refs: set[DatasetRef] = set(self.globalInitOutputRefs())

if include_intermediates is True:
# Need to enable inputs and outputs even if not explicitly
# requested.
request_include_init_inputs = True
request_include_inputs = True
request_include_init_outputs = True
request_include_outputs = True
else:
request_include_init_inputs = include_init_inputs
request_include_inputs = include_inputs
request_include_init_outputs = include_init_outputs
request_include_outputs = include_outputs

if request_include_init_inputs or request_include_init_outputs:
for task_def in self.iterTaskGraph():
if request_include_init_inputs:
if in_refs := self.initInputRefs(task_def):
init_input_refs.update(in_refs)
if request_include_init_outputs:
if out_refs := self.initOutputRefs(task_def):
init_output_refs.update(out_refs)

input_refs: set[DatasetRef] = set()
output_refs: set[DatasetRef] = set()

for qnode in self:
if request_include_inputs:
for other_refs in qnode.quantum.inputs.values():
input_refs.update(other_refs)
# Inputs can come with datastore records.
for store_name, records in qnode.quantum.datastore_records.items():
datastore_records.setdefault(store_name, DatastoreRecordData()).update(records)
if request_include_outputs:
for other_refs in qnode.quantum.outputs.values():
output_refs.update(other_refs)

# Intermediates are the intersection of inputs and outputs. Must do
# this analysis before conforming since dataset type changes will
# change set membership.
inter_msg = ""
intermediates = set()
if include_intermediates is not None:
intermediates = (input_refs | init_input_refs) & (output_refs | init_output_refs)

if include_intermediates is False:
# Remove intermediates from results.
init_input_refs -= intermediates
input_refs -= intermediates
init_output_refs -= intermediates
output_refs -= intermediates
inter_msg = f"; Intermediates removed: {len(intermediates)}"
intermediates = set()
elif include_intermediates is True:
# Do not mention intermediates if all the input/output flags
# would have resulted in them anyhow.
if (
(request_include_init_inputs is not include_init_inputs)
or (request_include_inputs is not include_inputs)
or (request_include_init_outputs is not include_init_outputs)
or (request_include_outputs is not include_outputs)
):
inter_msg = f"; including intermediates: {len(intermediates)}"

# Assign intermediates to the relevant category.
if not include_init_inputs:
init_input_refs &= intermediates
if not include_inputs:
input_refs &= intermediates
if not include_init_outputs:
init_output_refs &= intermediates
if not include_outputs:
output_refs &= intermediates

# Conforming can result in an input ref and an output ref appearing
# in the returned results that are identical apart from storage class.
if conform_outputs:
# Get data repository definitions from the QuantumGraph; these can
# have different storage classes than those in the quanta.
dataset_types = {dstype.name: dstype for dstype in self.registryDatasetTypes()}

def _update_ref(ref: DatasetRef) -> DatasetRef:
internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType)
if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name:
ref = ref.replace(storage_class=internal_dataset_type.storageClass_name)
return ref

# Convert output_refs to the data repository storage classes, too.
output_refs = {_update_ref(ref) for ref in output_refs}
init_output_refs = {_update_ref(ref) for ref in init_output_refs}

_LOG.verbose(
"Found the following datasets. InitInputs: %d; Inputs: %d; InitOutputs: %s; Outputs: %d%s",
len(init_input_refs),
len(input_refs),
len(init_output_refs),
len(output_refs),
inter_msg,
)

refs = input_refs | init_input_refs | init_output_refs | output_refs
return refs, datastore_records
1 change: 1 addition & 0 deletions python/lsst/pipe/base/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from .register_instrument import register_instrument
from .retrieve_artifacts_for_quanta import retrieve_artifacts_for_quanta
from .transfer_from_graph import transfer_from_graph
from .zip_from_graph import zip_from_graph
Loading

0 comments on commit 591c0c7

Please sign in to comment.