Skip to content

Commit

Permalink
Add visualization-only mode for PipelineGraph resolution.
Browse files Browse the repository at this point in the history
This does the best it can at resolving the graph so we can display
dimensions and [most] storage classes without a butler.
  • Loading branch information
TallJimbo committed Jun 6, 2024
1 parent 580c0df commit d0a4bcc
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 61 deletions.
18 changes: 13 additions & 5 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Module defining Pipeline class and related methods.
"""
"""Module defining Pipeline class and related methods."""

from __future__ import annotations

Expand Down Expand Up @@ -820,7 +819,9 @@ def write_to_uri(self, uri: ResourcePathExpression) -> None:
"""
self._pipelineIR.write_to_uri(uri)

def to_graph(self, registry: Registry | None = None) -> pipeline_graph.PipelineGraph:
def to_graph(
self, registry: Registry | None = None, visualization_only: bool = False
) -> pipeline_graph.PipelineGraph:
"""Construct a pipeline graph from this pipeline.
Constructing a graph applies all configuration overrides, freezes all
Expand All @@ -833,6 +834,13 @@ def to_graph(self, registry: Registry | None = None) -> pipeline_graph.PipelineG
registry : `lsst.daf.butler.Registry`, optional
Data repository client. If provided, the graph's dataset types
and dimensions will be resolved (see `PipelineGraph.resolve`).
visualization_only : `bool`
Resolve the graph as well as possible even when dimensions and
storage classes cannot really be determined. This can include
using the ``universe.commonSkyPix`` as the assumed dimensions of
connections that use the "skypix" placeholder and using "<UNKNOWN>"
as a storage class name (which will fail if the storage class
itself is ever actually loaded).
Returns
-------
Expand Down Expand Up @@ -865,8 +873,8 @@ def to_graph(self, registry: Registry | None = None) -> pipeline_graph.PipelineG
label, subset.subset, subset.description if subset.description is not None else ""
)
graph.sort()
if registry is not None:
graph.resolve(registry)
if registry is not None or visualization_only:
graph.resolve(registry=registry, visualization_only=visualization_only)

Check warning on line 877 in python/lsst/pipe/base/pipeline.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeline.py#L877

Added line #L877 was not covered by tests
return graph

# TODO: remove on DM-40443.
Expand Down
13 changes: 11 additions & 2 deletions python/lsst/pipe/base/pipeline_graph/_dataset_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def _from_edges(
get_registered: Callable[[str], DatasetType | None],
dimensions: DimensionUniverse,
previous: DatasetTypeNode | None,
visualization_only: bool = False,
) -> DatasetTypeNode:
"""Construct a dataset type node from its edges.
Expand All @@ -99,22 +100,29 @@ def _from_edges(
object in the internal networkx graph.
xgraph : `networkx.MultiDiGraph`
The internal networkx graph.
get_registered : `~collections.abc.Callable`
get_registered : `~collections.abc.Callable` or `None`
Callable that takes a dataset type name and returns the
`DatasetType` registered in the data repository, or `None` if it is
not registered.
dimensions : `lsst.daf.butler.DimensionUniverse`
Definitions of all dimensions.
previous : `DatasetTypeNode` or `None`
Previous node for this dataset type.
visualization_only : `bool`
Resolve the graph as well as possible even when dimensions and
storage classes cannot really be determined. This can include
using the ``universe.commonSkyPix`` as the assumed dimensions of
connections that use the "skypix" placeholder and using "<UNKNOWN>"
as a storage class name (which will fail if the storage class
itself is ever actually loaded).
Returns
-------
node : `DatasetTypeNode`
Node consistent with all edges pointing to it and the data
repository.
"""
dataset_type = get_registered(key.name)
dataset_type = get_registered(key.name) if get_registered is not None else None
is_registered = dataset_type is not None
if previous is not None and previous.dataset_type == dataset_type:
# This node was already resolved (with exactly the same edges
Expand Down Expand Up @@ -160,6 +168,7 @@ def _from_edges(
is_registered=is_registered,
producer=producer,
consumers=consumers,
visualization_only=visualization_only,
)
consumers.append(consuming_edge.task_label)
assert dataset_type is not None, "Graph structure guarantees at least one edge."
Expand Down
99 changes: 61 additions & 38 deletions python/lsst/pipe/base/pipeline_graph/_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ def _resolve_dataset_type(
producer: str | None,
consumers: Sequence[str],
is_registered: bool,
visualization_only: bool,
) -> tuple[DatasetType, bool, bool]:
"""Participate in the construction of the `DatasetTypeNode` object
associated with this edge.
Expand Down Expand Up @@ -499,6 +500,13 @@ def _resolve_dataset_type(
is_registered : `bool`
Whether a registration for this dataset type was found in the
data repository.
visualization_only : `bool`
Resolve the graph as well as possible even when dimensions and
storage classes cannot really be determined. This can include
using the ``universe.commonSkyPix`` as the assumed dimensions of
connections that use the "skypix" placeholder and using "<UNKNOWN>"
as a storage class name (which will fail if the storage class
itself is ever actually loaded).
Returns
-------
Expand Down Expand Up @@ -528,22 +536,28 @@ def _resolve_dataset_type(
"""
if "skypix" in self.raw_dimensions:
if current is None:
raise MissingDatasetTypeError(
f"DatasetType '{self.dataset_type_name}' referenced by "
f"{self.task_label!r} uses 'skypix' as a dimension "
f"placeholder, but has not been registered with the data repository. "
f"Note that reference catalog names are now used as the dataset "
f"type name instead of 'ref_cat'."
)
rest1 = set(universe.conform(self.raw_dimensions - {"skypix"}).names)
rest2 = current.dimensions.names - current.dimensions.skypix.names
if rest1 != rest2:
raise IncompatibleDatasetTypeError(
f"Non-skypix dimensions for dataset type {self.dataset_type_name} declared in "
f"connections ({rest1}) are inconsistent with those in "
f"registry's version of this dataset ({rest2})."
)
dimensions = current.dimensions.as_group()
if visualization_only:
dimensions = universe.conform(
[d if d != "skypix" else universe.commonSkyPix.name for d in self.raw_dimensions]
)
else:
raise MissingDatasetTypeError(
f"DatasetType '{self.dataset_type_name}' referenced by "
f"{self.task_label!r} uses 'skypix' as a dimension "
f"placeholder, but has not been registered with the data repository. "
f"Note that reference catalog names are now used as the dataset "
f"type name instead of 'ref_cat'."
)
else:
rest1 = set(universe.conform(self.raw_dimensions - {"skypix"}).names)
rest2 = current.dimensions.names - current.dimensions.skypix.names
if rest1 != rest2:
raise IncompatibleDatasetTypeError(
f"Non-skypix dimensions for dataset type {self.dataset_type_name} declared in "
f"connections ({rest1}) are inconsistent with those in "
f"registry's version of this dataset ({rest2})."
)
dimensions = current.dimensions.as_group()

Check warning on line 560 in python/lsst/pipe/base/pipeline_graph/_edges.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeline_graph/_edges.py#L560

Added line #L560 was not covered by tests
else:
dimensions = universe.conform(self.raw_dimensions)
is_initial_query_constraint = is_initial_query_constraint and not self.defer_query_constraint
Expand Down Expand Up @@ -576,28 +590,37 @@ def report_current_origin() -> str:

if self.component is not None:
if current is None:
raise MissingDatasetTypeError(
f"Dataset type {self.parent_dataset_type_name!r} is not registered and not produced by "
f"this pipeline, but it used by task {self.task_label!r}, via component "
f"{self.component!r}. This pipeline cannot be resolved until the parent dataset type is "
"registered."
)
all_current_components = current.storageClass.allComponents()
if self.component not in all_current_components:
raise IncompatibleDatasetTypeError(
f"Dataset type {self.parent_dataset_type_name!r} has storage class "
f"{current.storageClass_name!r} (from {report_current_origin()}), "
f"which does not include component {self.component!r} "
f"as requested by task {self.task_label!r}."
)
if all_current_components[self.component].name != self.storage_class_name:
raise IncompatibleDatasetTypeError(
f"Dataset type '{self.parent_dataset_type_name}.{self.component}' has storage class "
f"{all_current_components[self.component].name!r} "
f"(from {report_current_origin()}), which does not match "
f"{self.storage_class_name!r}, as requested by task {self.task_label!r}. "
"Note that storage class conversions of components are not supported."
)
if visualization_only:
current = DatasetType(

Check warning on line 594 in python/lsst/pipe/base/pipeline_graph/_edges.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeline_graph/_edges.py#L594

Added line #L594 was not covered by tests
self.parent_dataset_type_name,
dimensions,
storageClass="<UNKNOWN>",
isCalibration=self.is_calibration,
)
else:
raise MissingDatasetTypeError(
f"Dataset type {self.parent_dataset_type_name!r} is not registered and not produced "
f"by this pipeline, but it used by task {self.task_label!r}, via component "
f"{self.component!r}. This pipeline cannot be resolved until the parent dataset "
"type is registered."
)
else:
all_current_components = current.storageClass.allComponents()
if self.component not in all_current_components:
raise IncompatibleDatasetTypeError(
f"Dataset type {self.parent_dataset_type_name!r} has storage class "
f"{current.storageClass_name!r} (from {report_current_origin()}), "
f"which does not include component {self.component!r} "
f"as requested by task {self.task_label!r}."
)
if all_current_components[self.component].name != self.storage_class_name:
raise IncompatibleDatasetTypeError(

Check warning on line 617 in python/lsst/pipe/base/pipeline_graph/_edges.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeline_graph/_edges.py#L617

Added line #L617 was not covered by tests
f"Dataset type '{self.parent_dataset_type_name}.{self.component}' has storage class "
f"{all_current_components[self.component].name!r} "
f"(from {report_current_origin()}), which does not match "
f"{self.storage_class_name!r}, as requested by task {self.task_label!r}. "
"Note that storage class conversions of components are not supported."
)
return current, is_initial_query_constraint, is_prerequisite
else:
dataset_type = DatasetType(
Expand Down
47 changes: 31 additions & 16 deletions python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ def resolve(
registry: Registry | None = None,
dimensions: DimensionUniverse | None = None,
dataset_types: Mapping[str, DatasetType] | None = None,
visualization_only: bool = False,
) -> None:
"""Resolve all dimensions and dataset types and check them for
consistency.
Expand All @@ -520,13 +521,23 @@ def resolve(
Parameters
----------
registry : `lsst.daf.butler.Registry`, optional
Client for the data repository to resolve against. If not
provided, both ``dimensions`` and ``dataset_types`` must be.
Client for the data repository to resolve against.
dimensions : `lsst.daf.butler.DimensionUniverse`, optional
Definitions for all dimensions.
Definitions for all dimensions. Takes precedence over
``registry.dimensions`` if both are provided. If neither is
provided, defaults to the default dimension universe
(``lsst.daf.butler.DimensionUniverse()``).
dataset_types : `~collection.abc.Mapping` [ `str`, \
`~lsst.daf.butler.DatasetType` ], optional
Mapping of dataset types to consider registered.
Mapping of dataset types to consider registered. Takes precedence
over ``registry.getDatasetType()`` if both are provided.
visualization_only : `bool`
Resolve the graph as well as possible even when dimensions and
storage classes cannot really be determined. This can include
using the ``universe.commonSkyPix`` as the assumed dimensions of
connections that use the "skypix" placeholder and using "<UNKNOWN>"
as a storage class name (which will fail if the storage class
itself is ever actually loaded).
Notes
-----
Expand Down Expand Up @@ -556,28 +567,27 @@ def resolve(
Raised if ``check_edges_unchanged=True`` and the edges of a task do
change after import and reconfiguration.
"""
if registry is None and (dimensions is None or dataset_types is None):
raise PipelineGraphError(
"Either 'registry' or both 'dimensions' and 'dataset_types' "
"must be passed to PipelineGraph.resolve."
)

get_registered: Callable[[str], DatasetType | None]
get_registered: Callable[[str], DatasetType | None] | None = None
if dataset_types is not None:
# Ruff seems confused about whether this is used below; it is!
get_registered = dataset_types.get
else:
assert registry is not None
elif registry is not None:

def get_registered(name: str) -> DatasetType | None:
try:
return registry.getDatasetType(name)
except MissingDatasetTypeError:
return None
else:

def get_registered(name: str) -> None:
return None

Check warning on line 584 in python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py#L583-L584

Added lines #L583 - L584 were not covered by tests

if dimensions is None:
assert registry is not None
dimensions = registry.dimensions
if registry is not None:
dimensions = registry.dimensions
else:
dimensions = DimensionUniverse()

Check warning on line 590 in python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py#L590

Added line #L590 was not covered by tests

node_key: NodeKey
updates: dict[NodeKey, TaskNode | DatasetTypeNode] = {}
Expand All @@ -591,7 +601,12 @@ def get_registered(name: str) -> DatasetType | None:
case NodeType.DATASET_TYPE:
dataset_type_node: DatasetTypeNode | None = node_state["instance"]
new_dataset_type_node = DatasetTypeNode._from_edges(
node_key, self._xgraph, get_registered, dimensions, previous=dataset_type_node
node_key,
self._xgraph,
get_registered,
dimensions,
previous=dataset_type_node,
visualization_only=visualization_only,
)
# Usage of `is`` here is intentional; `_from_edges` returns
# `previous=dataset_type_node` if it can determine that it
Expand Down

0 comments on commit d0a4bcc

Please sign in to comment.