From d0a4bcc18979afe1b99edb2c4110503d8fa19007 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 6 Jun 2024 16:38:12 -0400 Subject: [PATCH] Add visualization-only mode for PipelineGraph resolution. This does the best it can at resolving the graph so we can display dimensions and [most] storage classes without a butler. --- python/lsst/pipe/base/pipeline.py | 18 +++- .../base/pipeline_graph/_dataset_types.py | 13 ++- .../lsst/pipe/base/pipeline_graph/_edges.py | 99 ++++++++++++------- .../base/pipeline_graph/_pipeline_graph.py | 47 ++++++--- 4 files changed, 116 insertions(+), 61 deletions(-) diff --git a/python/lsst/pipe/base/pipeline.py b/python/lsst/pipe/base/pipeline.py index f1bfe755b..c27862669 100644 --- a/python/lsst/pipe/base/pipeline.py +++ b/python/lsst/pipe/base/pipeline.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Module defining Pipeline class and related methods. -""" +"""Module defining Pipeline class and related methods.""" from __future__ import annotations @@ -820,7 +819,9 @@ def write_to_uri(self, uri: ResourcePathExpression) -> None: """ self._pipelineIR.write_to_uri(uri) - def to_graph(self, registry: Registry | None = None) -> pipeline_graph.PipelineGraph: + def to_graph( + self, registry: Registry | None = None, visualization_only: bool = False + ) -> pipeline_graph.PipelineGraph: """Construct a pipeline graph from this pipeline. Constructing a graph applies all configuration overrides, freezes all @@ -833,6 +834,13 @@ def to_graph(self, registry: Registry | None = None) -> pipeline_graph.PipelineG registry : `lsst.daf.butler.Registry`, optional Data repository client. If provided, the graph's dataset types and dimensions will be resolved (see `PipelineGraph.resolve`). + visualization_only : `bool` + Resolve the graph as well as possible even when dimensions and + storage classes cannot really be determined. This can include + using the ``universe.commonSkyPix`` as the assumed dimensions of + connections that use the "skypix" placeholder and using "" + as a storage class name (which will fail if the storage class + itself is ever actually loaded). Returns ------- @@ -865,8 +873,8 @@ def to_graph(self, registry: Registry | None = None) -> pipeline_graph.PipelineG label, subset.subset, subset.description if subset.description is not None else "" ) graph.sort() - if registry is not None: - graph.resolve(registry) + if registry is not None or visualization_only: + graph.resolve(registry=registry, visualization_only=visualization_only) return graph # TODO: remove on DM-40443. diff --git a/python/lsst/pipe/base/pipeline_graph/_dataset_types.py b/python/lsst/pipe/base/pipeline_graph/_dataset_types.py index 58f57585d..5732bf9a0 100644 --- a/python/lsst/pipe/base/pipeline_graph/_dataset_types.py +++ b/python/lsst/pipe/base/pipeline_graph/_dataset_types.py @@ -89,6 +89,7 @@ def _from_edges( get_registered: Callable[[str], DatasetType | None], dimensions: DimensionUniverse, previous: DatasetTypeNode | None, + visualization_only: bool = False, ) -> DatasetTypeNode: """Construct a dataset type node from its edges. @@ -99,7 +100,7 @@ def _from_edges( object in the internal networkx graph. xgraph : `networkx.MultiDiGraph` The internal networkx graph. - get_registered : `~collections.abc.Callable` + get_registered : `~collections.abc.Callable` or `None` Callable that takes a dataset type name and returns the `DatasetType` registered in the data repository, or `None` if it is not registered. @@ -107,6 +108,13 @@ def _from_edges( Definitions of all dimensions. previous : `DatasetTypeNode` or `None` Previous node for this dataset type. + visualization_only : `bool` + Resolve the graph as well as possible even when dimensions and + storage classes cannot really be determined. This can include + using the ``universe.commonSkyPix`` as the assumed dimensions of + connections that use the "skypix" placeholder and using "" + as a storage class name (which will fail if the storage class + itself is ever actually loaded). Returns ------- @@ -114,7 +122,7 @@ def _from_edges( Node consistent with all edges pointing to it and the data repository. """ - dataset_type = get_registered(key.name) + dataset_type = get_registered(key.name) if get_registered is not None else None is_registered = dataset_type is not None if previous is not None and previous.dataset_type == dataset_type: # This node was already resolved (with exactly the same edges @@ -160,6 +168,7 @@ def _from_edges( is_registered=is_registered, producer=producer, consumers=consumers, + visualization_only=visualization_only, ) consumers.append(consuming_edge.task_label) assert dataset_type is not None, "Graph structure guarantees at least one edge." diff --git a/python/lsst/pipe/base/pipeline_graph/_edges.py b/python/lsst/pipe/base/pipeline_graph/_edges.py index ee161729a..9ddb475c2 100644 --- a/python/lsst/pipe/base/pipeline_graph/_edges.py +++ b/python/lsst/pipe/base/pipeline_graph/_edges.py @@ -470,6 +470,7 @@ def _resolve_dataset_type( producer: str | None, consumers: Sequence[str], is_registered: bool, + visualization_only: bool, ) -> tuple[DatasetType, bool, bool]: """Participate in the construction of the `DatasetTypeNode` object associated with this edge. @@ -499,6 +500,13 @@ def _resolve_dataset_type( is_registered : `bool` Whether a registration for this dataset type was found in the data repository. + visualization_only : `bool` + Resolve the graph as well as possible even when dimensions and + storage classes cannot really be determined. This can include + using the ``universe.commonSkyPix`` as the assumed dimensions of + connections that use the "skypix" placeholder and using "" + as a storage class name (which will fail if the storage class + itself is ever actually loaded). Returns ------- @@ -528,22 +536,28 @@ def _resolve_dataset_type( """ if "skypix" in self.raw_dimensions: if current is None: - raise MissingDatasetTypeError( - f"DatasetType '{self.dataset_type_name}' referenced by " - f"{self.task_label!r} uses 'skypix' as a dimension " - f"placeholder, but has not been registered with the data repository. " - f"Note that reference catalog names are now used as the dataset " - f"type name instead of 'ref_cat'." - ) - rest1 = set(universe.conform(self.raw_dimensions - {"skypix"}).names) - rest2 = current.dimensions.names - current.dimensions.skypix.names - if rest1 != rest2: - raise IncompatibleDatasetTypeError( - f"Non-skypix dimensions for dataset type {self.dataset_type_name} declared in " - f"connections ({rest1}) are inconsistent with those in " - f"registry's version of this dataset ({rest2})." - ) - dimensions = current.dimensions.as_group() + if visualization_only: + dimensions = universe.conform( + [d if d != "skypix" else universe.commonSkyPix.name for d in self.raw_dimensions] + ) + else: + raise MissingDatasetTypeError( + f"DatasetType '{self.dataset_type_name}' referenced by " + f"{self.task_label!r} uses 'skypix' as a dimension " + f"placeholder, but has not been registered with the data repository. " + f"Note that reference catalog names are now used as the dataset " + f"type name instead of 'ref_cat'." + ) + else: + rest1 = set(universe.conform(self.raw_dimensions - {"skypix"}).names) + rest2 = current.dimensions.names - current.dimensions.skypix.names + if rest1 != rest2: + raise IncompatibleDatasetTypeError( + f"Non-skypix dimensions for dataset type {self.dataset_type_name} declared in " + f"connections ({rest1}) are inconsistent with those in " + f"registry's version of this dataset ({rest2})." + ) + dimensions = current.dimensions.as_group() else: dimensions = universe.conform(self.raw_dimensions) is_initial_query_constraint = is_initial_query_constraint and not self.defer_query_constraint @@ -576,28 +590,37 @@ def report_current_origin() -> str: if self.component is not None: if current is None: - raise MissingDatasetTypeError( - f"Dataset type {self.parent_dataset_type_name!r} is not registered and not produced by " - f"this pipeline, but it used by task {self.task_label!r}, via component " - f"{self.component!r}. This pipeline cannot be resolved until the parent dataset type is " - "registered." - ) - all_current_components = current.storageClass.allComponents() - if self.component not in all_current_components: - raise IncompatibleDatasetTypeError( - f"Dataset type {self.parent_dataset_type_name!r} has storage class " - f"{current.storageClass_name!r} (from {report_current_origin()}), " - f"which does not include component {self.component!r} " - f"as requested by task {self.task_label!r}." - ) - if all_current_components[self.component].name != self.storage_class_name: - raise IncompatibleDatasetTypeError( - f"Dataset type '{self.parent_dataset_type_name}.{self.component}' has storage class " - f"{all_current_components[self.component].name!r} " - f"(from {report_current_origin()}), which does not match " - f"{self.storage_class_name!r}, as requested by task {self.task_label!r}. " - "Note that storage class conversions of components are not supported." - ) + if visualization_only: + current = DatasetType( + self.parent_dataset_type_name, + dimensions, + storageClass="", + isCalibration=self.is_calibration, + ) + else: + raise MissingDatasetTypeError( + f"Dataset type {self.parent_dataset_type_name!r} is not registered and not produced " + f"by this pipeline, but it used by task {self.task_label!r}, via component " + f"{self.component!r}. This pipeline cannot be resolved until the parent dataset " + "type is registered." + ) + else: + all_current_components = current.storageClass.allComponents() + if self.component not in all_current_components: + raise IncompatibleDatasetTypeError( + f"Dataset type {self.parent_dataset_type_name!r} has storage class " + f"{current.storageClass_name!r} (from {report_current_origin()}), " + f"which does not include component {self.component!r} " + f"as requested by task {self.task_label!r}." + ) + if all_current_components[self.component].name != self.storage_class_name: + raise IncompatibleDatasetTypeError( + f"Dataset type '{self.parent_dataset_type_name}.{self.component}' has storage class " + f"{all_current_components[self.component].name!r} " + f"(from {report_current_origin()}), which does not match " + f"{self.storage_class_name!r}, as requested by task {self.task_label!r}. " + "Note that storage class conversions of components are not supported." + ) return current, is_initial_query_constraint, is_prerequisite else: dataset_type = DatasetType( diff --git a/python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py b/python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py index 6862a7865..e51fa0b55 100644 --- a/python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py +++ b/python/lsst/pipe/base/pipeline_graph/_pipeline_graph.py @@ -511,6 +511,7 @@ def resolve( registry: Registry | None = None, dimensions: DimensionUniverse | None = None, dataset_types: Mapping[str, DatasetType] | None = None, + visualization_only: bool = False, ) -> None: """Resolve all dimensions and dataset types and check them for consistency. @@ -520,13 +521,23 @@ def resolve( Parameters ---------- registry : `lsst.daf.butler.Registry`, optional - Client for the data repository to resolve against. If not - provided, both ``dimensions`` and ``dataset_types`` must be. + Client for the data repository to resolve against. dimensions : `lsst.daf.butler.DimensionUniverse`, optional - Definitions for all dimensions. + Definitions for all dimensions. Takes precedence over + ``registry.dimensions`` if both are provided. If neither is + provided, defaults to the default dimension universe + (``lsst.daf.butler.DimensionUniverse()``). dataset_types : `~collection.abc.Mapping` [ `str`, \ `~lsst.daf.butler.DatasetType` ], optional - Mapping of dataset types to consider registered. + Mapping of dataset types to consider registered. Takes precedence + over ``registry.getDatasetType()`` if both are provided. + visualization_only : `bool` + Resolve the graph as well as possible even when dimensions and + storage classes cannot really be determined. This can include + using the ``universe.commonSkyPix`` as the assumed dimensions of + connections that use the "skypix" placeholder and using "" + as a storage class name (which will fail if the storage class + itself is ever actually loaded). Notes ----- @@ -556,28 +567,27 @@ def resolve( Raised if ``check_edges_unchanged=True`` and the edges of a task do change after import and reconfiguration. """ - if registry is None and (dimensions is None or dataset_types is None): - raise PipelineGraphError( - "Either 'registry' or both 'dimensions' and 'dataset_types' " - "must be passed to PipelineGraph.resolve." - ) - - get_registered: Callable[[str], DatasetType | None] + get_registered: Callable[[str], DatasetType | None] | None = None if dataset_types is not None: # Ruff seems confused about whether this is used below; it is! get_registered = dataset_types.get - else: - assert registry is not None + elif registry is not None: def get_registered(name: str) -> DatasetType | None: try: return registry.getDatasetType(name) except MissingDatasetTypeError: return None + else: + + def get_registered(name: str) -> None: + return None if dimensions is None: - assert registry is not None - dimensions = registry.dimensions + if registry is not None: + dimensions = registry.dimensions + else: + dimensions = DimensionUniverse() node_key: NodeKey updates: dict[NodeKey, TaskNode | DatasetTypeNode] = {} @@ -591,7 +601,12 @@ def get_registered(name: str) -> DatasetType | None: case NodeType.DATASET_TYPE: dataset_type_node: DatasetTypeNode | None = node_state["instance"] new_dataset_type_node = DatasetTypeNode._from_edges( - node_key, self._xgraph, get_registered, dimensions, previous=dataset_type_node + node_key, + self._xgraph, + get_registered, + dimensions, + previous=dataset_type_node, + visualization_only=visualization_only, ) # Usage of `is`` here is intentional; `_from_edges` returns # `previous=dataset_type_node` if it can determine that it