Skip to content

Commit

Permalink
Use new Butler query methods (DM-41761)
Browse files Browse the repository at this point in the history
  • Loading branch information
andy-slac committed Dec 5, 2023
1 parent cdd3437 commit 7ead70c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
rev: v4.5.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- id: check-toml
- repo: https://github.com/psf/black
rev: 23.7.0
rev: 23.11.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -22,6 +22,6 @@ repos:
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.0.282
rev: v0.1.6
hooks:
- id: ruff
35 changes: 19 additions & 16 deletions python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,12 @@
import dataclasses
from collections.abc import Iterator, Mapping
from contextlib import contextmanager
from typing import Any, final
from typing import TYPE_CHECKING, Any, final

from lsst.daf.butler import Butler, DimensionGroup
from lsst.daf.butler.registry import MissingDatasetTypeError
from lsst.daf.butler.registry.queries import DataCoordinateQueryResults
from lsst.utils.logging import LsstLogAdapter
from lsst.utils.timer import timeMethod

from ._datasetQueryConstraints import DatasetQueryConstraintVariant
from .pipeline_graph import DatasetTypeNode, PipelineGraph, TaskNode
from .quantum_graph_builder import (
DatasetKey,
PrerequisiteDatasetKey,
Expand All @@ -55,6 +51,12 @@
QuantumKey,
)

if TYPE_CHECKING:
from lsst.daf.butler import Butler, DataCoordinateQueryResults, DimensionGroup
from lsst.utils.logging import LsstLogAdapter

from .pipeline_graph import DatasetTypeNode, PipelineGraph, TaskNode


@final
class AllDimensionsQuantumGraphBuilder(QuantumGraphBuilder):
Expand Down Expand Up @@ -228,7 +230,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
# to find these.
count = 0
try:
for ref in data_ids.findDatasets(dataset_type_node.name, self.input_collections):
for ref in data_ids.find_datasets(dataset_type_node.name, self.input_collections):
self.existing_datasets.inputs[
DatasetKey(dataset_type_node.name, ref.dataId.required_values)
] = ref
Expand All @@ -245,7 +247,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
# that we might skip...
count = 0
try:
for ref in data_ids.findDatasets(dataset_type_node.name, self.skip_existing_in):
for ref in data_ids.find_datasets(dataset_type_node.name, self.skip_existing_in):
key = DatasetKey(dataset_type_node.name, ref.dataId.required_values)
self.existing_datasets.outputs_for_skip[key] = ref
count += 1
Expand All @@ -265,7 +267,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
# previous block).
count = 0
try:
for ref in data_ids.findDatasets(dataset_type_node.name, [self.output_run]):
for ref in data_ids.find_datasets(dataset_type_node.name, [self.output_run]):
self.existing_datasets.outputs_in_the_way[
DatasetKey(dataset_type_node.name, ref.dataId.required_values)
] = ref
Expand Down Expand Up @@ -335,7 +337,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG
# IDs to the datasets we're looking for.
count = 0
try:
query_results = data_ids.findRelatedDatasets(
query_results = data_ids.find_related_datasets(
finder.dataset_type_node.dataset_type, self.input_collections
)
except MissingDatasetTypeError:
Expand Down Expand Up @@ -455,7 +457,7 @@ def from_builder(
result.query_args = {
"dimensions": dimensions,
"where": builder.where,
"dataId": result.subgraph.data_id,
"data_id": result.subgraph.data_id,
"bind": builder.bind,
}
if builder.dataset_query_constraint == DatasetQueryConstraintVariant.ALL:
Expand Down Expand Up @@ -490,17 +492,18 @@ def from_builder(
)
builder.log.verbose("Querying for data IDs with arguments:")
builder.log.verbose(" dimensions=%s,", list(result.query_args["dimensions"].names))
builder.log.verbose(" dataId=%s,", dict(result.query_args["dataId"].required))
builder.log.verbose(" data_id=%s,", dict(result.query_args["data_id"].required))
if result.query_args["where"]:
builder.log.verbose(" where=%s,", repr(result.query_args["where"]))
if "datasets" in result.query_args:
builder.log.verbose(" datasets=%s,", list(result.query_args["datasets"]))
if "collections" in result.query_args:
builder.log.verbose(" collections=%s,", list(result.query_args["collections"]))
with builder.butler.registry.queryDataIds(**result.query_args).materialize() as common_data_ids:
builder.log.debug("Expanding data IDs.")
result.common_data_ids = common_data_ids.expanded()
yield result
with builder.butler.query() as query:
with query.data_ids(**result.query_args).materialize() as common_data_ids:
builder.log.debug("Expanding data IDs.")
result.common_data_ids = common_data_ids.expanded()
yield result

def log_failure(self, log: LsstLogAdapter) -> None:
"""Emit a series of CRITICAL-level log message that attempts to explain
Expand All @@ -519,7 +522,7 @@ def log_failure(self, log: LsstLogAdapter) -> None:
# so they can read it more easily and copy and paste into
# a Python terminal.
log.critical(" dimensions=%s,", list(self.query_args["dimensions"].names))
log.critical(" dataId=%s,", dict(self.query_args["dataId"].required))
log.critical(" data_id=%s,", dict(self.query_args["data_id"].required))
if self.query_args["where"]:
log.critical(" where=%s,", repr(self.query_args["where"]))
if "datasets" in self.query_args:
Expand Down

0 comments on commit 7ead70c

Please sign in to comment.