Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-47328: Experimental command to copy files based on a graph #454

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ jobs:
run: |
pytest -r a -v -n 3 --cov=lsst.pipe.base --cov=tests --cov-report=xml --cov-report=term --cov-branch
butler register-instrument -h
butler transfer-from-graph -h
butler retrieve-artifacts-for-quanta -h
- name: Upload coverage to codecov
uses: codecov/codecov-action@v4
with:
Expand Down
2 changes: 2 additions & 0 deletions doc/changes/DM-47328.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Added new command-line ``butler retrieve-artifacts-for-quanta`` which can be used to retrieve input or output datasets associated with a graph or specific quanta.
* Added new ``QuantumGraph.get_refs()`` method to retrieve dataset refs from a graph.
4 changes: 2 additions & 2 deletions python/lsst/pipe/base/cli/cmd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph"]
__all__ = ["register_instrument", "transfer_from_graph", "zip_from_graph", "retrieve_artifacts_for_quanta"]

from .commands import register_instrument, transfer_from_graph, zip_from_graph
from .commands import register_instrument, retrieve_artifacts_for_quanta, transfer_from_graph, zip_from_graph
61 changes: 60 additions & 1 deletion python/lsst/pipe/base/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
register_dataset_types_option,
repo_argument,
transfer_dimensions_option,
transfer_option,
)
from lsst.daf.butler.cli.utils import ButlerCommand
from lsst.daf.butler.cli.utils import ButlerCommand, split_commas, unwrap

from ... import script
from ..opt import instrument_argument, update_output_chain_option
Expand Down Expand Up @@ -92,3 +93,61 @@
"""
zip = script.zip_from_graph(**kwargs)
print(f"Zip archive written to {zip}")


@click.command(short_help="Retrieve artifacts from subset of graph.", cls=ButlerCommand)
@click.argument("graph", required=True)
@repo_argument(
required=True,
help="REPO is a URI to a butler configuration that is used to configure "
"the datastore of the quantum-backed butler.",
)
@click.argument("dest", required=True)
@transfer_option()
@click.option(
"--preserve-path/--no-preserve-path",
is_flag=True,
default=True,
help="Preserve the datastore path to the artifact at the destination.",
)
@click.option(
"--clobber/--no-clobber",
is_flag=True,
default=False,
help="If clobber, overwrite files if they exist locally.",
)
@click.option(
"--qgraph-node-id",
callback=split_commas,
multiple=True,
help=unwrap(
"""Only load a specified set of nodes when graph is
loaded from a file, nodes are identified by UUID
values. One or more comma-separated strings are
accepted. By default all nodes are loaded. Ignored if
graph is not loaded from a file."""
),
)
@click.option(
"--include-inputs/--no-include-inputs",
is_flag=True,
default=True,
help="Whether to include input datasets in retrieval.",
)
@click.option(
"--include-outputs/--no-include-outputs",
is_flag=True,
default=True,
help="Whether to include outut datasets in retrieval.",
)
@options_file_option()
def retrieve_artifacts_for_quanta(**kwargs: Any) -> None:
"""Retrieve artifacts from given quanta defined in quantum graph.

GRAPH is a URI to the source quantum graph file to use when building the
Zip archive.

DEST is a directory to write the Zip archive.
"""
artifacts = script.retrieve_artifacts_for_quanta(**kwargs)
print(f"Written {len(artifacts)} artifacts to {kwargs['dest']}.")

Check warning on line 153 in python/lsst/pipe/base/cli/cmd/commands.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/cli/cmd/commands.py#L152-L153

Added lines #L152 - L153 were not covered by tests
1 change: 1 addition & 0 deletions python/lsst/pipe/base/cli/resources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ cmd:
- register-instrument
- transfer-from-graph
- zip-from-graph
- retrieve-artifacts-for-quanta
156 changes: 154 additions & 2 deletions python/lsst/pipe/base/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from types import MappingProxyType
from typing import Any, BinaryIO, TypeVar

import lsst.utils.logging
import networkx as nx
from lsst.daf.butler import (
Config,
Expand Down Expand Up @@ -75,6 +76,7 @@
from .quantumNode import BuildId, QuantumNode

_T = TypeVar("_T", bound="QuantumGraph")
_LOG = lsst.utils.logging.getLogger(__name__)

# modify this constant any time the on disk representation of the save file
# changes, and update the load helpers to behave properly for each version.
Expand Down Expand Up @@ -909,7 +911,7 @@
cls,
uri: ResourcePathExpression,
universe: DimensionUniverse | None = None,
nodes: Iterable[uuid.UUID] | None = None,
nodes: Iterable[uuid.UUID | str] | None = None,
timj marked this conversation as resolved.
Show resolved Hide resolved
graphID: BuildId | None = None,
minimumVersion: int = 3,
) -> QuantumGraph:
Expand All @@ -924,7 +926,7 @@
saved structure. If supplied, the
`~lsst.daf.butler.DimensionUniverse` from the loaded `QuantumGraph`
will be validated against the supplied argument for compatibility.
nodes : iterable of `uuid.UUID` or `None`
nodes : iterable of [ `uuid.UUID` | `str` ] or `None`
UUIDs that correspond to nodes in the graph. If specified, only
these nodes will be loaded. Defaults to None, in which case all
nodes will be loaded.
Expand Down Expand Up @@ -1656,3 +1658,153 @@
self.write_configs(butler, compare_existing=existing)
self.write_packages(butler, compare_existing=existing)
self.write_init_outputs(butler, skip_existing=existing)

def get_refs(
timj marked this conversation as resolved.
Show resolved Hide resolved
self,
*,
include_init_inputs: bool = False,
include_inputs: bool = False,
include_intermediates: bool | None = None,
include_init_outputs: bool = False,
include_outputs: bool = False,
conform_outputs: bool = True,
) -> tuple[set[DatasetRef], dict[str, DatastoreRecordData]]:
"""Get the requested dataset refs from the graph.

Parameters
----------
include_init_inputs : `bool`, optional
Include init inputs.
include_inputs : `bool`, optional
Include inputs.
include_intermediates : `bool` or `None`, optional
If `None`, no special handling for intermediates is performed.
If `True` intermediates are calculated even if other flags
do not request datasets. If `False` intermediates will be removed
from any results.
include_init_outputs : `bool`, optional
Include init outpus.
include_outputs : `bool`, optional
Include outputs.
conform_outputs : `bool`, optional
Whether any outputs found should have their dataset types conformed
with the registry dataset types.

Returns
-------
refs : `set` [ `lsst.daf.butler.DatasetRef` ]
The requested dataset refs found in the graph.
datastore_records : `dict` [ `str`, \
`lsst.daf.butler.datastore.record_data.DatastoreRecordData` ]
Any datastore records found.

Notes
-----
Conforming and requesting inputs and outputs can result in the same
dataset appearing in the results twice with differing storage classes.
"""
datastore_records: dict[str, DatastoreRecordData] = {}
init_input_refs: set[DatasetRef] = set()
init_output_refs: set[DatasetRef] = set(self.globalInitOutputRefs())

if include_intermediates is True:
# Need to enable inputs and outputs even if not explicitly
# requested.
request_include_init_inputs = True
request_include_inputs = True
request_include_init_outputs = True
request_include_outputs = True
else:
request_include_init_inputs = include_init_inputs
request_include_inputs = include_inputs
request_include_init_outputs = include_init_outputs
request_include_outputs = include_outputs

if request_include_init_inputs or request_include_init_outputs:
for task_def in self.iterTaskGraph():
if request_include_init_inputs:
if in_refs := self.initInputRefs(task_def):
init_input_refs.update(in_refs)
timj marked this conversation as resolved.
Show resolved Hide resolved
if request_include_init_outputs:
if out_refs := self.initOutputRefs(task_def):
init_output_refs.update(out_refs)

input_refs: set[DatasetRef] = set()
output_refs: set[DatasetRef] = set()

for qnode in self:
if request_include_inputs:
for other_refs in qnode.quantum.inputs.values():
input_refs.update(other_refs)
# Inputs can come with datastore records.
for store_name, records in qnode.quantum.datastore_records.items():
datastore_records.setdefault(store_name, DatastoreRecordData()).update(records)

Check warning on line 1741 in python/lsst/pipe/base/graph/graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/graph/graph.py#L1741

Added line #L1741 was not covered by tests
if request_include_outputs:
for other_refs in qnode.quantum.outputs.values():
output_refs.update(other_refs)

# Intermediates are the intersection of inputs and outputs. Must do
# this analysis before conforming since dataset type changes will
# change set membership.
inter_msg = ""
intermediates = set()
if include_intermediates is not None:
intermediates = (input_refs | init_input_refs) & (output_refs | init_output_refs)

if include_intermediates is False:
# Remove intermediates from results.
init_input_refs -= intermediates
input_refs -= intermediates
init_output_refs -= intermediates
output_refs -= intermediates
inter_msg = f"; Intermediates removed: {len(intermediates)}"
intermediates = set()
elif include_intermediates is True:
# Do not mention intermediates if all the input/output flags
# would have resulted in them anyhow.
if (
(request_include_init_inputs is not include_init_inputs)
or (request_include_inputs is not include_inputs)
or (request_include_init_outputs is not include_init_outputs)
or (request_include_outputs is not include_outputs)
):
inter_msg = f"; including intermediates: {len(intermediates)}"

# Assign intermediates to the relevant category.
if not include_init_inputs:
init_input_refs &= intermediates
if not include_inputs:
input_refs &= intermediates
if not include_init_outputs:
init_output_refs &= intermediates
if not include_outputs:
output_refs &= intermediates

# Conforming can result in an input ref and an output ref appearing
# in the returned results that are identical apart from storage class.
if conform_outputs:
# Get data repository definitions from the QuantumGraph; these can
# have different storage classes than those in the quanta.
dataset_types = {dstype.name: dstype for dstype in self.registryDatasetTypes()}

def _update_ref(ref: DatasetRef) -> DatasetRef:
internal_dataset_type = dataset_types.get(ref.datasetType.name, ref.datasetType)
if internal_dataset_type.storageClass_name != ref.datasetType.storageClass_name:
ref = ref.replace(storage_class=internal_dataset_type.storageClass_name)

Check warning on line 1793 in python/lsst/pipe/base/graph/graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/graph/graph.py#L1793

Added line #L1793 was not covered by tests
return ref

# Convert output_refs to the data repository storage classes, too.
output_refs = {_update_ref(ref) for ref in output_refs}
init_output_refs = {_update_ref(ref) for ref in init_output_refs}

_LOG.verbose(
"Found the following datasets. InitInputs: %d; Inputs: %d; InitOutputs: %s; Outputs: %d%s",
len(init_input_refs),
len(input_refs),
len(init_output_refs),
len(output_refs),
inter_msg,
)

refs = input_refs | init_input_refs | init_output_refs | output_refs
return refs, datastore_records
1 change: 1 addition & 0 deletions python/lsst/pipe/base/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from .register_instrument import register_instrument
from .retrieve_artifacts_for_quanta import retrieve_artifacts_for_quanta
from .transfer_from_graph import transfer_from_graph
from .zip_from_graph import zip_from_graph
Loading
Loading