From 12241177a2f5ba36672af956e7fc36c58e150e13 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sat, 2 Sep 2023 10:55:41 -0400 Subject: [PATCH 1/2] Remove interfaces deprecated on DM-40441. --- python/lsst/pipe/base/__init__.py | 10 - python/lsst/pipe/base/connectionTypes.py | 65 +- python/lsst/pipe/base/graphBuilder.py | 190 ------ python/lsst/pipe/base/pipeTools.py | 151 ----- python/lsst/pipe/base/pipeline.py | 641 +----------------- .../lsst/pipe/base/quantum_graph_builder.py | 20 +- python/lsst/pipe/base/taskFactory.py | 8 +- python/lsst/pipe/base/tests/simpleQGraph.py | 40 +- 8 files changed, 19 insertions(+), 1106 deletions(-) delete mode 100644 python/lsst/pipe/base/graphBuilder.py delete mode 100644 python/lsst/pipe/base/pipeTools.py diff --git a/python/lsst/pipe/base/__init__.py b/python/lsst/pipe/base/__init__.py index 72955a953..6635486ec 100644 --- a/python/lsst/pipe/base/__init__.py +++ b/python/lsst/pipe/base/__init__.py @@ -1,7 +1,3 @@ -# The graphBuilder module is deprecated, but we still export its symbols for -# backwards compatibility. -import warnings - from . import automatic_connection_constants, connectionTypes, pipeline_graph, pipelineIR, utils from ._dataset_handle import * @@ -17,12 +13,6 @@ from .connections import * from .executionButlerBuilder import * from .graph import * - -with warnings.catch_warnings(): - warnings.simplefilter("ignore", FutureWarning) - from .graphBuilder import * -del warnings - from .pipeline import * # We import the main PipelineGraph type and the module (above), but we don't diff --git a/python/lsst/pipe/base/connectionTypes.py b/python/lsst/pipe/base/connectionTypes.py index 397caae2f..cc9985844 100644 --- a/python/lsst/pipe/base/connectionTypes.py +++ b/python/lsst/pipe/base/connectionTypes.py @@ -35,8 +35,7 @@ from collections.abc import Callable, Iterable, Sequence from typing import ClassVar -from deprecated.sphinx import deprecated as deprecated_sphinx # avoid clash with BaseConnection.deprecated -from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, DimensionUniverse, Registry, StorageClass +from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, Registry from lsst.utils.introspection import find_outside_stacklevel @@ -106,35 +105,6 @@ def __get__(self, inst, klass): f"Connection {self.varName!r} of {klass.__name__} has been removed." ) from None - # TODO: remove on DM-40443. - @deprecated_sphinx( - reason="Deprecated in favor of PipelineGraph, and will be removed after v27.", - version="27.0", - category=FutureWarning, - ) - def makeDatasetType( - self, universe: DimensionUniverse, parentStorageClass: StorageClass | str | None = None - ) -> DatasetType: - """Construct a true `~lsst.daf.butler.DatasetType` instance with - normalized dimensions. - - Parameters - ---------- - universe : `lsst.daf.butler.DimensionUniverse` - Set of all known dimensions to be used to normalize the dimension - names specified in config. - parentStorageClass : `lsst.daf.butler.StorageClass` or `str`, optional - Parent storage class for component datasets; `None` otherwise. - - Returns - ------- - datasetType : `~lsst.daf.butler.DatasetType` - The `~lsst.daf.butler.DatasetType` defined by this connection. - """ - return DatasetType( - self.name, universe.empty, self.storageClass, parentStorageClass=parentStorageClass - ) - @dataclasses.dataclass(frozen=True) class DimensionedConnection(BaseConnection): @@ -176,39 +146,6 @@ def __post_init__(self): if not isinstance(self.dimensions, Iterable): raise TypeError("Dimensions must be iterable of dimensions") - # TODO: remove on DM-40443. - @deprecated_sphinx( - reason="Deprecated in favor of PipelineGraph, and will be removed after v27.", - version="27.0", - category=FutureWarning, - ) - def makeDatasetType( - self, universe: DimensionUniverse, parentStorageClass: StorageClass | str | None = None - ) -> DatasetType: - """Construct a true `~lsst.daf.butler.DatasetType` instance with - normalized dimensions. - - Parameters - ---------- - universe : `lsst.daf.butler.DimensionUniverse` - Set of all known dimensions to be used to normalize the dimension - names specified in config. - parentStorageClass : `lsst.daf.butler.StorageClass` or `str`, optional - Parent storage class for component datasets; `None` otherwise. - - Returns - ------- - datasetType : `~lsst.daf.butler.DatasetType` - The `~lsst.daf.butler.DatasetType` defined by this connection. - """ - return DatasetType( - self.name, - universe.conform(self.dimensions), - self.storageClass, - isCalibration=self.isCalibration, - parentStorageClass=parentStorageClass, - ) - @dataclasses.dataclass(frozen=True) class BaseInput(DimensionedConnection): diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py deleted file mode 100644 index f1c6b5adc..000000000 --- a/python/lsst/pipe/base/graphBuilder.py +++ /dev/null @@ -1,190 +0,0 @@ -# This file is part of pipe_base. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (http://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This software is dual licensed under the GNU General Public License and also -# under a 3-clause BSD license. Recipients may choose which of these licenses -# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, -# respectively. If you choose the GPL option then the following text applies -# (but note that there is still no warranty even if you opt for BSD instead): -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -"""Module defining GraphBuilder class and related methods. -""" - -from __future__ import annotations - -__all__ = ["GraphBuilder"] - - -import warnings -from collections.abc import Iterable, Mapping -from typing import Any - -from deprecated.sphinx import deprecated -from lsst.daf.butler import Butler, DataCoordinate, Datastore, Registry -from lsst.daf.butler.registry.wildcards import CollectionWildcard -from lsst.utils.introspection import find_outside_stacklevel - -from ._datasetQueryConstraints import DatasetQueryConstraintVariant -from .all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder -from .graph import QuantumGraph -from .pipeline import Pipeline, TaskDef -from .pipeline_graph import PipelineGraph - -# Re-exports for backwards-compatibility. -from .quantum_graph_builder import GraphBuilderError # noqa: F401 -from .quantum_graph_builder import OutputExistsError # noqa: F401 -from .quantum_graph_builder import PrerequisiteMissingError # noqa: F401 - -# TODO: remove this module on DM-40443. -warnings.warn( - "The graphBuilder module is deprecated in favor of quantum_graph_builder, and will be removed after v27.", - category=FutureWarning, - stacklevel=find_outside_stacklevel("lsst.pipe.base"), -) - - -@deprecated( - "Deprecated in favor of QuantumGraphBuilder and will be removed after v27.", - version="v27.0", - category=FutureWarning, -) -class GraphBuilder: - """GraphBuilder class is responsible for building task execution graph from - a Pipeline. - - Parameters - ---------- - registry : `~lsst.daf.butler.Registry` - Data butler instance. - skipExistingIn : `~typing.Any` - Expressions representing the collections to search for existing - output datasets that should be skipped. See - :ref:`daf_butler_ordered_collection_searches`. - clobberOutputs : `bool`, optional - If `True` (default), allow quanta to created even if partial outputs - exist; this requires the same behavior behavior to be enabled when - executing. - datastore : `~lsst.daf.butler.Datastore`, optional - If not `None` then fill datastore records in each generated Quantum. - """ - - def __init__( - self, - registry: Registry, - skipExistingIn: Any = None, - clobberOutputs: bool = True, - datastore: Datastore | None = None, - ): - self.registry = registry - self.dimensions = registry.dimensions - self.skipExistingIn = skipExistingIn - self.clobberOutputs = clobberOutputs - self.datastore = datastore - - def makeGraph( - self, - pipeline: Pipeline | Iterable[TaskDef], - collections: Any, - run: str, - userQuery: str | None, - datasetQueryConstraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL, - metadata: Mapping[str, Any] | None = None, - bind: Mapping[str, Any] | None = None, - dataId: DataCoordinate | None = None, - ) -> QuantumGraph: - """Create execution graph for a pipeline. - - Parameters - ---------- - pipeline : `Pipeline` or `~collections.abc.Iterable` [ `TaskDef` ] - Pipeline definition, task names/classes and their configs. - collections : `~typing.Any` - Expressions representing the collections to search for input - datasets. See :ref:`daf_butler_ordered_collection_searches`. - run : `str` - Name of the `~lsst.daf.butler.CollectionType.RUN` collection for - output datasets. Collection does not have to exist and it will be - created when graph is executed. - userQuery : `str` - String which defines user-defined selection for registry, should be - empty or `None` if there is no restrictions on data selection. - datasetQueryConstraint : `DatasetQueryConstraintVariant`, optional - The query constraint variant that should be used to constraint the - query based on dataset existance, defaults to - `DatasetQueryConstraintVariant.ALL`. - metadata : Optional Mapping of `str` to primitives - This is an optional parameter of extra data to carry with the - graph. Entries in this mapping should be able to be serialized in - JSON. - bind : `~collections.abc.Mapping`, optional - Mapping containing literal values that should be injected into the - ``userQuery`` expression, keyed by the identifiers they replace. - dataId : `lsst.daf.butler.DataCoordinate`, optional - Data ID that should also be included in the query constraint. - Ignored if ``pipeline`` is a `Pipeline` instance (which has its own - data ID). - - Returns - ------- - graph : `QuantumGraph` - The constructed graph. - - Raises - ------ - UserExpressionError - Raised when user expression cannot be parsed. - OutputExistsError - Raised when output datasets already exist. - Exception - Other exceptions types may be raised by underlying registry - classes. - """ - if isinstance(pipeline, Pipeline): - pipeline_graph = pipeline.to_graph() - else: - pipeline_graph = PipelineGraph(data_id=dataId) - for task_def in pipeline: - pipeline_graph.add_task( - task_def.label, - task_def.taskClass, - config=task_def.config, - connections=task_def.connections, - ) - # We assume `registry` is actually a RegistryShim that has a butler - # inside it, since that's now the only kind of Registry code outside - # Butler should be able to get, and we assert that the datastore came - # from the same place. Soon this interface will be deprecated in favor - # of QuantumGraphBuilder (which takes a Butler directly, as all new - # code should) anyway. - butler: Butler = self.registry._butler # type: ignore - assert butler._datastore is self.datastore or self.datastore is None - qgb = AllDimensionsQuantumGraphBuilder( - pipeline_graph, - butler, - input_collections=CollectionWildcard.from_expression(collections).require_ordered(), - output_run=run, - skip_existing_in=self.skipExistingIn if self.skipExistingIn is not None else (), - clobber=self.clobberOutputs, - where=userQuery if userQuery is not None else "", - dataset_query_constraint=datasetQueryConstraint, - bind=bind, - ) - return qgb.build(metadata, attach_datastore_records=(self.datastore is not None)) diff --git a/python/lsst/pipe/base/pipeTools.py b/python/lsst/pipe/base/pipeTools.py deleted file mode 100644 index ed7d1ccf0..000000000 --- a/python/lsst/pipe/base/pipeTools.py +++ /dev/null @@ -1,151 +0,0 @@ -# This file is part of pipe_base. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (http://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This software is dual licensed under the GNU General Public License and also -# under a 3-clause BSD license. Recipients may choose which of these licenses -# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, -# respectively. If you choose the GPL option then the following text applies -# (but note that there is still no warranty even if you opt for BSD instead): -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -"""Module defining few methods to manipulate or query pipelines. -""" - -from __future__ import annotations - -# No one should do import * from this module -__all__ = ["isPipelineOrdered", "orderPipeline"] - -import warnings -from collections.abc import Iterable -from typing import TYPE_CHECKING - -from deprecated.sphinx import deprecated -from lsst.utils.introspection import find_outside_stacklevel - -from .pipeline import Pipeline, TaskDef - -# Exceptions re-exported here for backwards compatibility. -from .pipeline_graph import DuplicateOutputError, PipelineDataCycleError, PipelineGraph # noqa: F401 - -if TYPE_CHECKING: - from .taskFactory import TaskFactory - -# TODO: remove this module on DM-40443. -warnings.warn( - "The pipeTools module and its contents are deprecated in favor of PipelineGraph, and will be removed " - "after v27.", - category=FutureWarning, - stacklevel=find_outside_stacklevel("lsst.pipe.base"), -) - - -@deprecated( - "Deprecated and will be removed after v27.", - version="v27.0", - category=FutureWarning, -) -class MissingTaskFactoryError(Exception): - """Exception raised when client fails to provide TaskFactory instance.""" - - pass - - -@deprecated( - "Deprecated in favor of PipelineGraph methods and will be removed after v27.", - version="v27.0", - category=FutureWarning, -) -def isPipelineOrdered(pipeline: Pipeline | Iterable[TaskDef], taskFactory: TaskFactory | None = None) -> bool: - """Check whether tasks in pipeline are correctly ordered. - - Pipeline is correctly ordered if for any DatasetType produced by a task - in a pipeline all its consumer tasks are located after producer. - - Parameters - ---------- - pipeline : `Pipeline` or `collections.abc.Iterable` [ `TaskDef` ] - Pipeline description. - taskFactory : `TaskFactory`, optional - Ignored; present only for backwards compatibility. - - Returns - ------- - is_ordered : `bool` - True for correctly ordered pipeline, False otherwise. - - Raises - ------ - ImportError - Raised when task class cannot be imported. - DuplicateOutputError - Raised when there is more than one producer for a dataset type. - """ - if isinstance(pipeline, Pipeline): - graph = pipeline.to_graph() - else: - graph = PipelineGraph() - for task_def in pipeline: - graph.add_task(task_def.label, task_def.taskClass, task_def.config, task_def.connections) - # Can't use graph.is_sorted because that requires sorted dataset type names - # as well as sorted tasks. - tasks_xgraph = graph.make_task_xgraph() - seen: set[str] = set() - for task_label in tasks_xgraph: - successors = set(tasks_xgraph.successors(task_label)) - if not successors.isdisjoint(seen): - return False - seen.add(task_label) - return True - - -@deprecated( - "Deprecated in favor of PipelineGraph methods and will be removed after v27.", - version="v27.0", - category=FutureWarning, -) -def orderPipeline(pipeline: Pipeline | Iterable[TaskDef]) -> list[TaskDef]: - """Re-order tasks in pipeline to satisfy data dependencies. - - Parameters - ---------- - pipeline : `Pipeline` or `collections.abc.Iterable` [ `TaskDef` ] - Pipeline description. - - Returns - ------- - ordered : `list` [ `TaskDef` ] - Correctly ordered pipeline. - - Raises - ------ - DuplicateOutputError - Raised when there is more than one producer for a dataset type. - PipelineDataCycleError - Raised when the pipeline has dependency cycles. - """ - if isinstance(pipeline, Pipeline): - graph = pipeline.to_graph() - else: - graph = PipelineGraph() - for task_def in pipeline: - graph.add_task(task_def.label, task_def.taskClass, task_def.config, task_def.connections) - graph.sort() - return list(graph._iter_task_defs()) diff --git a/python/lsst/pipe/base/pipeline.py b/python/lsst/pipe/base/pipeline.py index ba9eb3dbe..d29b54ad2 100644 --- a/python/lsst/pipe/base/pipeline.py +++ b/python/lsst/pipe/base/pipeline.py @@ -29,7 +29,7 @@ from __future__ import annotations -__all__ = ["Pipeline", "TaskDef", "TaskDatasetTypes", "PipelineDatasetTypes", "LabelSpecifier"] +__all__ = ["Pipeline", "TaskDef", "LabelSpecifier"] import copy import logging @@ -39,16 +39,15 @@ # ------------------------------- # Imports of standard modules -- # ------------------------------- -from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Set +from collections.abc import Callable, Set from dataclasses import dataclass from types import MappingProxyType -from typing import TYPE_CHECKING, ClassVar, cast +from typing import TYPE_CHECKING, cast # ----------------------------- # Imports for other modules -- # ----------------------------- -from deprecated.sphinx import deprecated -from lsst.daf.butler import DataCoordinate, DatasetType, DimensionUniverse, NamedValueSet, Registry +from lsst.daf.butler import DataCoordinate, DimensionUniverse, Registry from lsst.resources import ResourcePath, ResourcePathExpression from lsst.utils import doImportType from lsst.utils.introspection import get_full_type_name @@ -57,8 +56,7 @@ from . import pipeline_graph, pipelineIR from ._instrument import Instrument as PipeBaseInstrument from .config import PipelineTaskConfig -from .connections import PipelineTaskConnections, iterConnections -from .connectionTypes import Input +from .connections import PipelineTaskConnections from .pipelineTask import PipelineTask if TYPE_CHECKING: # Imports needed only for type annotations; may be circular. @@ -877,30 +875,6 @@ def to_graph( graph.resolve(registry=registry, visualization_only=visualization_only) return graph - # TODO: remove on DM-40443. - @deprecated( - reason="Deprecated in favor of to_graph; will be removed after v27.", - version="v27.0", - category=FutureWarning, - ) - def toExpandedPipeline(self) -> Generator[TaskDef, None, None]: - r"""Return a generator of `TaskDef`\s which can be used to create - quantum graphs. - - Returns - ------- - generator : generator of `TaskDef` - The generator returned will be the sorted iterator of tasks which - are to be used in constructing a quantum graph. - - Raises - ------ - NotImplementedError - If a dataId is supplied in a config block. This is in place for - future use. - """ - yield from self.to_graph()._iter_task_defs() - def _add_task_to_graph(self, label: str, graph: pipeline_graph.PipelineGraph) -> None: """Add a single task from this pipeline to a pipeline graph that is under construction. @@ -929,30 +903,6 @@ def _add_task_to_graph(self, label: str, graph: pipeline_graph.PipelineGraph) -> ) graph.add_task(label, taskClass, config) - # TODO: remove on DM-40443. - @deprecated( - reason="Deprecated in favor of to_graph; will be removed after v27.", - version="v27.0", - category=FutureWarning, - ) - def __iter__(self) -> Generator[TaskDef, None, None]: - return self.toExpandedPipeline() - - # TODO: remove on DM-40443. - @deprecated( - reason="Deprecated in favor of to_graph; will be removed after v27.", - version="v27.0", - category=FutureWarning, - ) - def __getitem__(self, item: str) -> TaskDef: - # Making a whole graph and then making a TaskDef from that is pretty - # backwards, but I'm hoping to deprecate this method shortly in favor - # of making the graph explicitly and working with its node objects. - graph = pipeline_graph.PipelineGraph() - self._add_task_to_graph(item, graph) - (result,) = graph._iter_task_defs() - return result - def __len__(self) -> int: return len(self._pipelineIR.tasks) @@ -971,584 +921,3 @@ def __eq__(self, other: object) -> bool: raise NotImplementedError( "Pipelines cannot be compared because config instances cannot be compared; see DM-27847." ) - - -# TODO: remove on DM-40443. -@deprecated( - reason="TaskDatasetTypes has been replaced by PipelineGraph, and will be removed after v27.", - version="v27.0", - category=FutureWarning, -) -@dataclass(frozen=True) -class TaskDatasetTypes: - """An immutable struct that extracts and classifies the dataset types used - by a `PipelineTask`. - """ - - initInputs: NamedValueSet[DatasetType] - """Dataset types that are needed as inputs in order to construct this Task. - - Task-level `initInputs` may be classified as either - `~PipelineDatasetTypes.initInputs` or - `~PipelineDatasetTypes.initIntermediates` at the Pipeline level. - """ - - initOutputs: NamedValueSet[DatasetType] - """Dataset types that may be written after constructing this Task. - - Task-level `initOutputs` may be classified as either - `~PipelineDatasetTypes.initOutputs` or - `~PipelineDatasetTypes.initIntermediates` at the Pipeline level. - """ - - inputs: NamedValueSet[DatasetType] - """Dataset types that are regular inputs to this Task. - - If an input dataset needed for a Quantum cannot be found in the input - collection(s) or produced by another Task in the Pipeline, that Quantum - (and all dependent Quanta) will not be produced. - - Task-level `inputs` may be classified as either - `~PipelineDatasetTypes.inputs` or `~PipelineDatasetTypes.intermediates` - at the Pipeline level. - """ - - queryConstraints: NamedValueSet[DatasetType] - """Regular inputs that should not be used as constraints on the initial - QuantumGraph generation data ID query, according to their tasks - (`NamedValueSet`). - """ - - prerequisites: NamedValueSet[DatasetType] - """Dataset types that are prerequisite inputs to this Task. - - Prerequisite inputs must exist in the input collection(s) before the - pipeline is run, but do not constrain the graph - if a prerequisite is - missing for a Quantum, `PrerequisiteMissingError` is raised. - - Prerequisite inputs are not resolved until the second stage of - QuantumGraph generation. - """ - - outputs: NamedValueSet[DatasetType] - """Dataset types that are produced by this Task. - - Task-level `outputs` may be classified as either - `~PipelineDatasetTypes.outputs` or `~PipelineDatasetTypes.intermediates` - at the Pipeline level. - """ - - @classmethod - def fromTaskDef( - cls, - taskDef: TaskDef, - *, - registry: Registry, - include_configs: bool = True, - storage_class_mapping: Mapping[str, str] | None = None, - ) -> TaskDatasetTypes: - """Extract and classify the dataset types from a single `PipelineTask`. - - Parameters - ---------- - taskDef : `TaskDef` - An instance of a `TaskDef` class for a particular `PipelineTask`. - registry : `Registry` - Registry used to construct normalized - `~lsst.daf.butler.DatasetType` objects and retrieve those that are - incomplete. - include_configs : `bool`, optional - If `True` (default) include config dataset types as - ``initOutputs``. - storage_class_mapping : `~collections.abc.Mapping` of `str` to \ - `~lsst.daf.butler.StorageClass`, optional - If a taskdef contains a component dataset type that is unknown - to the registry, its parent `~lsst.daf.butler.StorageClass` will - be looked up in this mapping if it is supplied. If the mapping does - not contain the composite dataset type, or the mapping is not - supplied an exception will be raised. - - Returns - ------- - types: `TaskDatasetTypes` - The dataset types used by this task. - - Raises - ------ - ValueError - Raised if dataset type connection definition differs from - registry definition. - LookupError - Raised if component parent StorageClass could not be determined - and storage_class_mapping does not contain the composite type, or - is set to None. - """ - - def makeDatasetTypesSet( - connectionType: str, - is_input: bool, - freeze: bool = True, - ) -> NamedValueSet[DatasetType]: - """Construct a set of true `~lsst.daf.butler.DatasetType` objects. - - Parameters - ---------- - connectionType : `str` - Name of the connection type to produce a set for, corresponds - to an attribute of type `list` on the connection class - instance. - is_input : `bool` - These are input dataset types, else they are output dataset - types. - freeze : `bool`, optional - If `True`, call `NamedValueSet.freeze` on the object returned. - - Returns - ------- - datasetTypes : `NamedValueSet` - A set of all datasetTypes which correspond to the input - connection type specified in the connection class of this - `PipelineTask`. - - Raises - ------ - ValueError - Raised if dataset type connection definition differs from - registry definition. - LookupError - Raised if component parent StorageClass could not be determined - and storage_class_mapping does not contain the composite type, - or is set to None. - - Notes - ----- - This function is a closure over the variables ``registry`` and - ``taskDef``, and ``storage_class_mapping``. - """ - datasetTypes = NamedValueSet[DatasetType]() - for c in iterConnections(taskDef.connections, connectionType): - dimensions = set(getattr(c, "dimensions", set())) - if "skypix" in dimensions: - try: - datasetType = registry.getDatasetType(c.name) - except LookupError as err: - raise LookupError( - f"DatasetType '{c.name}' referenced by " - f"{type(taskDef.connections).__name__} uses 'skypix' as a dimension " - "placeholder, but does not already exist in the registry. " - "Note that reference catalog names are now used as the dataset " - "type name instead of 'ref_cat'." - ) from err - rest1 = set(registry.dimensions.conform(dimensions - {"skypix"}).names) - rest2 = datasetType.dimensions.names - datasetType.dimensions.skypix.names - if rest1 != rest2: - raise ValueError( - f"Non-skypix dimensions for dataset type {c.name} declared in " - f"connections ({rest1}) are inconsistent with those in " - f"registry's version of this dataset ({rest2})." - ) - else: - # Component dataset types are not explicitly in the - # registry. This complicates consistency checks with - # registry and requires we work out the composite storage - # class. - registryDatasetType = None - try: - registryDatasetType = registry.getDatasetType(c.name) - except KeyError: - compositeName, componentName = DatasetType.splitDatasetTypeName(c.name) - if componentName: - if storage_class_mapping is None or compositeName not in storage_class_mapping: - raise LookupError( - "Component parent class cannot be determined, and " - "composite name was not in storage class mapping, or no " - "storage_class_mapping was supplied" - ) from None - else: - parentStorageClass = storage_class_mapping[compositeName] - else: - parentStorageClass = None - datasetType = c.makeDatasetType( - registry.dimensions, parentStorageClass=parentStorageClass - ) - registryDatasetType = datasetType - else: - datasetType = c.makeDatasetType( - registry.dimensions, parentStorageClass=registryDatasetType.parentStorageClass - ) - - if registryDatasetType and datasetType != registryDatasetType: - # The dataset types differ but first check to see if - # they are compatible before raising. - if is_input: - # This DatasetType must be compatible on get. - is_compatible = datasetType.is_compatible_with(registryDatasetType) - else: - # Has to be able to be converted to expect type - # on put. - is_compatible = registryDatasetType.is_compatible_with(datasetType) - if is_compatible: - # For inputs we want the pipeline to use the - # pipeline definition, for outputs it should use - # the registry definition. - if not is_input: - datasetType = registryDatasetType - _LOG.debug( - "Dataset types differ (task %s != registry %s) but are compatible" - " for %s in %s.", - datasetType, - registryDatasetType, - "input" if is_input else "output", - taskDef.label, - ) - else: - try: - # Explicitly check for storage class just to - # make more specific message. - _ = datasetType.storageClass - except KeyError: - raise ValueError( - "Storage class does not exist for supplied dataset type " - f"{datasetType} for {taskDef.label}." - ) from None - raise ValueError( - f"Supplied dataset type ({datasetType}) inconsistent with " - f"registry definition ({registryDatasetType}) " - f"for {taskDef.label}." - ) - datasetTypes.add(datasetType) - if freeze: - datasetTypes.freeze() - return datasetTypes - - # optionally add initOutput dataset for config - initOutputs = makeDatasetTypesSet("initOutputs", is_input=False, freeze=False) - if include_configs: - initOutputs.add( - DatasetType( - taskDef.configDatasetName, - registry.dimensions.empty, - storageClass=acc.CONFIG_INIT_OUTPUT_STORAGE_CLASS, - ) - ) - initOutputs.freeze() - - # optionally add output dataset for metadata - outputs = makeDatasetTypesSet("outputs", is_input=False, freeze=False) - - # Metadata is supposed to be of the TaskMetadata type, its dimensions - # correspond to a task quantum. - dimensions = registry.dimensions.conform(taskDef.connections.dimensions) - - # Allow the storage class definition to be read from the existing - # dataset type definition if present. - try: - current = registry.getDatasetType(taskDef.metadataDatasetName) - except KeyError: - # No previous definition so use the default. - storageClass = acc.METADATA_OUTPUT_STORAGE_CLASS - else: - storageClass = current.storageClass.name - outputs.update({DatasetType(taskDef.metadataDatasetName, dimensions, storageClass)}) - - if taskDef.logOutputDatasetName is not None: - # Log output dimensions correspond to a task quantum. - dimensions = registry.dimensions.conform(taskDef.connections.dimensions) - outputs.update( - { - DatasetType( - taskDef.logOutputDatasetName, - dimensions, - acc.LOG_OUTPUT_STORAGE_CLASS, - ) - } - ) - - outputs.freeze() - - inputs = makeDatasetTypesSet("inputs", is_input=True) - queryConstraints = NamedValueSet( - inputs[c.name] - for c in cast(Iterable[Input], iterConnections(taskDef.connections, "inputs")) - if not c.deferGraphConstraint - ) - - return cls( - initInputs=makeDatasetTypesSet("initInputs", is_input=True), - initOutputs=initOutputs, - inputs=inputs, - queryConstraints=queryConstraints, - prerequisites=makeDatasetTypesSet("prerequisiteInputs", is_input=True), - outputs=outputs, - ) - - -# TODO: remove on DM-40443. -@deprecated( - reason="PipelineDatasetTypes has been replaced by PipelineGraph, and will be removed after v27.", - version="v27.0", - category=FutureWarning, -) -@dataclass(frozen=True) -class PipelineDatasetTypes: - """An immutable struct that classifies the dataset types used in a - `Pipeline`. - """ - - packagesDatasetName: ClassVar[str] = acc.PACKAGES_INIT_OUTPUT_NAME - """Name of a dataset type used to save package versions. - """ - - initInputs: NamedValueSet[DatasetType] - """Dataset types that are needed as inputs in order to construct the Tasks - in this Pipeline. - - This does not include dataset types that are produced when constructing - other Tasks in the Pipeline (these are classified as `initIntermediates`). - """ - - initOutputs: NamedValueSet[DatasetType] - """Dataset types that may be written after constructing the Tasks in this - Pipeline. - - This does not include dataset types that are also used as inputs when - constructing other Tasks in the Pipeline (these are classified as - `initIntermediates`). - """ - - initIntermediates: NamedValueSet[DatasetType] - """Dataset types that are both used when constructing one or more Tasks - in the Pipeline and produced as a side-effect of constructing another - Task in the Pipeline. - """ - - inputs: NamedValueSet[DatasetType] - """Dataset types that are regular inputs for the full pipeline. - - If an input dataset needed for a Quantum cannot be found in the input - collection(s), that Quantum (and all dependent Quanta) will not be - produced. - """ - - queryConstraints: NamedValueSet[DatasetType] - """Regular inputs that should be used as constraints on the initial - QuantumGraph generation data ID query, according to their tasks - (`NamedValueSet`). - """ - - prerequisites: NamedValueSet[DatasetType] - """Dataset types that are prerequisite inputs for the full Pipeline. - - Prerequisite inputs must exist in the input collection(s) before the - pipeline is run, but do not constrain the graph - if a prerequisite is - missing for a Quantum, `PrerequisiteMissingError` is raised. - - Prerequisite inputs are not resolved until the second stage of - QuantumGraph generation. - """ - - intermediates: NamedValueSet[DatasetType] - """Dataset types that are output by one Task in the Pipeline and consumed - as inputs by one or more other Tasks in the Pipeline. - """ - - outputs: NamedValueSet[DatasetType] - """Dataset types that are output by a Task in the Pipeline and not consumed - by any other Task in the Pipeline. - """ - - byTask: Mapping[str, TaskDatasetTypes] - """Per-Task dataset types, keyed by label in the `Pipeline`. - - This is guaranteed to be zip-iterable with the `Pipeline` itself (assuming - neither has been modified since the dataset types were extracted, of - course). - """ - - @classmethod - def fromPipeline( - cls, - pipeline: Pipeline | Iterable[TaskDef], - *, - registry: Registry, - include_configs: bool = True, - include_packages: bool = True, - ) -> PipelineDatasetTypes: - """Extract and classify the dataset types from all tasks in a - `Pipeline`. - - Parameters - ---------- - pipeline : `Pipeline` or `~collections.abc.Iterable` [ `TaskDef` ] - A collection of tasks that can be run together. - registry : `Registry` - Registry used to construct normalized - `~lsst.daf.butler.DatasetType` objects and retrieve those that are - incomplete. - include_configs : `bool`, optional - If `True` (default) include config dataset types as - ``initOutputs``. - include_packages : `bool`, optional - If `True` (default) include the dataset type for software package - versions in ``initOutputs``. - - Returns - ------- - types: `PipelineDatasetTypes` - The dataset types used by this `Pipeline`. - - Raises - ------ - ValueError - Raised if Tasks are inconsistent about which datasets are marked - prerequisite. This indicates that the Tasks cannot be run as part - of the same `Pipeline`. - """ - allInputs = NamedValueSet[DatasetType]() - allOutputs = NamedValueSet[DatasetType]() - allInitInputs = NamedValueSet[DatasetType]() - allInitOutputs = NamedValueSet[DatasetType]() - prerequisites = NamedValueSet[DatasetType]() - queryConstraints = NamedValueSet[DatasetType]() - byTask = dict() - if include_packages: - allInitOutputs.add( - DatasetType( - cls.packagesDatasetName, - registry.dimensions.empty, - storageClass=acc.PACKAGES_INIT_OUTPUT_STORAGE_CLASS, - ) - ) - # create a list of TaskDefs in case the input is a generator - pipeline = list(pipeline) - - # collect all the output dataset types - typeStorageclassMap: dict[str, str] = {} - for taskDef in pipeline: - for outConnection in iterConnections(taskDef.connections, "outputs"): - typeStorageclassMap[outConnection.name] = outConnection.storageClass - - for taskDef in pipeline: - thisTask = TaskDatasetTypes.fromTaskDef( - taskDef, - registry=registry, - include_configs=include_configs, - storage_class_mapping=typeStorageclassMap, - ) - allInitInputs.update(thisTask.initInputs) - allInitOutputs.update(thisTask.initOutputs) - allInputs.update(thisTask.inputs) - # Inputs are query constraints if any task considers them a query - # constraint. - queryConstraints.update(thisTask.queryConstraints) - prerequisites.update(thisTask.prerequisites) - allOutputs.update(thisTask.outputs) - byTask[taskDef.label] = thisTask - if not prerequisites.isdisjoint(allInputs): - raise ValueError( - "{} marked as both prerequisites and regular inputs".format( - {dt.name for dt in allInputs & prerequisites} - ) - ) - if not prerequisites.isdisjoint(allOutputs): - raise ValueError( - "{} marked as both prerequisites and outputs".format( - {dt.name for dt in allOutputs & prerequisites} - ) - ) - # Make sure that components which are marked as inputs get treated as - # intermediates if there is an output which produces the composite - # containing the component - intermediateComponents = NamedValueSet[DatasetType]() - intermediateComposites = NamedValueSet[DatasetType]() - outputNameMapping = {dsType.name: dsType for dsType in allOutputs} - for dsType in allInputs: - # get the name of a possible component - name, component = dsType.nameAndComponent() - # if there is a component name, that means this is a component - # DatasetType, if there is an output which produces the parent of - # this component, treat this input as an intermediate - if component is not None: - # This needs to be in this if block, because someone might have - # a composite that is a pure input from existing data - if name in outputNameMapping: - intermediateComponents.add(dsType) - intermediateComposites.add(outputNameMapping[name]) - - def checkConsistency(a: NamedValueSet, b: NamedValueSet) -> None: - common = a.names & b.names - for name in common: - # Any compatibility is allowed. This function does not know - # if a dataset type is to be used for input or output. - if not (a[name].is_compatible_with(b[name]) or b[name].is_compatible_with(a[name])): - raise ValueError(f"Conflicting definitions for dataset type: {a[name]} != {b[name]}.") - - checkConsistency(allInitInputs, allInitOutputs) - checkConsistency(allInputs, allOutputs) - checkConsistency(allInputs, intermediateComposites) - checkConsistency(allOutputs, intermediateComposites) - - def frozen(s: Set[DatasetType]) -> NamedValueSet[DatasetType]: - assert isinstance(s, NamedValueSet) - s.freeze() - return s - - inputs = frozen(allInputs - allOutputs - intermediateComponents) - - return cls( - initInputs=frozen(allInitInputs - allInitOutputs), - initIntermediates=frozen(allInitInputs & allInitOutputs), - initOutputs=frozen(allInitOutputs - allInitInputs), - inputs=inputs, - queryConstraints=frozen(queryConstraints & inputs), - # If there are storage class differences in inputs and outputs - # the intermediates have to choose priority. Here choose that - # inputs to tasks much match the requested storage class by - # applying the inputs over the top of the outputs. - intermediates=frozen(allOutputs & allInputs | intermediateComponents), - outputs=frozen(allOutputs - allInputs - intermediateComposites), - prerequisites=frozen(prerequisites), - byTask=MappingProxyType(byTask), # MappingProxyType -> frozen view of dict for immutability - ) - - @classmethod - def initOutputNames( - cls, - pipeline: Pipeline | Iterable[TaskDef], - *, - include_configs: bool = True, - include_packages: bool = True, - ) -> Iterator[str]: - """Return the names of dataset types ot task initOutputs, Configs, - and package versions for a pipeline. - - Parameters - ---------- - pipeline : `Pipeline` or `~collections.abc.Iterable` [ `TaskDef` ] - A `Pipeline` instance or collection of `TaskDef` instances. - include_configs : `bool`, optional - If `True` (default) include config dataset types. - include_packages : `bool`, optional - If `True` (default) include the dataset type for package versions. - - Yields - ------ - datasetTypeName : `str` - Name of the dataset type. - """ - if include_packages: - # Package versions dataset type - yield cls.packagesDatasetName - - if isinstance(pipeline, Pipeline): - pipeline = pipeline.toExpandedPipeline() - - for taskDef in pipeline: - # all task InitOutputs - for name in taskDef.connections.initOutputs: - attribute = getattr(taskDef.connections, name) - yield attribute.name - - # config dataset name - if include_configs: - yield taskDef.configDatasetName diff --git a/python/lsst/pipe/base/quantum_graph_builder.py b/python/lsst/pipe/base/quantum_graph_builder.py index 990992881..261786ddf 100644 --- a/python/lsst/pipe/base/quantum_graph_builder.py +++ b/python/lsst/pipe/base/quantum_graph_builder.py @@ -44,7 +44,6 @@ from collections.abc import Iterable, Mapping, Sequence from typing import TYPE_CHECKING, Any, final -from deprecated.sphinx import deprecated from lsst.daf.butler import ( Butler, CollectionType, @@ -85,28 +84,13 @@ class QuantumGraphBuilderError(Exception): pass -# TODO: remove class and switch downstream inheritance to just -# QuantumGraphBuilderError on DM-40443. -@deprecated( - "Deprecated in favor of QuantumGraphBuilderError and will be removed after v27.", - version="v27.0", - category=FutureWarning, -) -class GraphBuilderError(QuantumGraphBuilderError): - """Backwards-compatibility near-alias for QuantumGraphBuilderError.""" - - pass - - -# Inherit from backwards-compatibility alias for backwards-compatibility. -class OutputExistsError(GraphBuilderError): +class OutputExistsError(QuantumGraphBuilderError): """Exception generated when output datasets already exist.""" pass -# Inherit from backwards-compatibility alias for backwards-compatibility. -class PrerequisiteMissingError(GraphBuilderError): +class PrerequisiteMissingError(QuantumGraphBuilderError): """Exception generated when a prerequisite dataset does not exist.""" pass diff --git a/python/lsst/pipe/base/taskFactory.py b/python/lsst/pipe/base/taskFactory.py index 77abc49df..423a25bca 100644 --- a/python/lsst/pipe/base/taskFactory.py +++ b/python/lsst/pipe/base/taskFactory.py @@ -39,7 +39,6 @@ if TYPE_CHECKING: from lsst.daf.butler import DatasetRef, LimitedButler - from .pipeline import TaskDef from .pipeline_graph import TaskNode from .pipelineTask import PipelineTask @@ -54,7 +53,7 @@ class TaskFactory(metaclass=ABCMeta): @abstractmethod def makeTask( self, - task_node: TaskDef | TaskNode, # TODO: remove TaskDef on DM-40443. + task_node: TaskNode, /, butler: LimitedButler, initInputRefs: Iterable[DatasetRef] | None, @@ -63,9 +62,8 @@ def makeTask( Parameters ---------- - task_node : `~pipeline_graph.TaskNode` or `TaskDef` - Task definition structure. `TaskDef` support is deprecated and - will be removed after v27. + task_node : `~pipeline_graph.TaskNode` + Task definition structure. butler : `lsst.daf.butler.LimitedButler` Butler instance used to obtain initialization inputs for task. initInputRefs : `~collections.abc.Iterable` of \ diff --git a/python/lsst/pipe/base/tests/simpleQGraph.py b/python/lsst/pipe/base/tests/simpleQGraph.py index 60cd86ebb..f0c069567 100644 --- a/python/lsst/pipe/base/tests/simpleQGraph.py +++ b/python/lsst/pipe/base/tests/simpleQGraph.py @@ -32,7 +32,6 @@ __all__ = ["AddTaskConfig", "AddTask", "AddTaskFactoryMock"] import logging -import warnings from collections.abc import Iterable, Mapping, MutableMapping from typing import TYPE_CHECKING, Any, cast @@ -43,7 +42,7 @@ from lsst.daf.butler.logging import ButlerLogRecords from lsst.resources import ResourcePath from lsst.utils import doImportType -from lsst.utils.introspection import find_outside_stacklevel, get_full_type_name +from lsst.utils.introspection import get_full_type_name from .. import connectionTypes as cT from .._instrument import Instrument @@ -53,7 +52,7 @@ from ..config import PipelineTaskConfig from ..connections import PipelineTaskConnections from ..graph import QuantumGraph -from ..pipeline import Pipeline, TaskDef +from ..pipeline import Pipeline from ..pipeline_graph import PipelineGraph, TaskNode from ..pipelineTask import PipelineTask from ..struct import Struct @@ -178,28 +177,17 @@ def __init__(self, stopAt: int = -1): def makeTask( self, - task_node: TaskDef | TaskNode, + task_node: TaskNode, /, butler: LimitedButler, initInputRefs: Iterable[DatasetRef] | None, ) -> PipelineTask: - if isinstance(task_node, TaskDef): - # TODO: remove support on DM-40443. - warnings.warn( - "Passing TaskDef to TaskFactory is deprecated and will not be supported after v27.", - FutureWarning, - find_outside_stacklevel("lsst.pipe.base"), - ) - task_class = task_node.taskClass - assert task_class is not None - else: - task_class = task_node.task_class - task = task_class(config=task_node.config, initInputs=None, name=task_node.label) + task = task_node.task_class(config=task_node.config, initInputs=None, name=task_node.label) task.taskFactory = self # type: ignore return task -def registerDatasetTypes(registry: Registry, pipeline: Pipeline | Iterable[TaskDef] | PipelineGraph) -> None: +def registerDatasetTypes(registry: Registry, pipeline: Pipeline | PipelineGraph) -> None: """Register all dataset types used by tasks in a registry. Copied and modified from `PreExecInit.initializeDatasetTypes`. @@ -208,12 +196,9 @@ def registerDatasetTypes(registry: Registry, pipeline: Pipeline | Iterable[TaskD ---------- registry : `~lsst.daf.butler.Registry` Registry instance. - pipeline : `.Pipeline`, `~collections.abc..Iterable` of `.TaskDef`, or \ - `.pipeline_graph.PipelineGraph` + pipeline : `.Pipeline`, or `.pipeline_graph.PipelineGraph` The pipeline whose dataset types should be registered, as a `.Pipeline` - instance, `.PipelineGraph` instance, or iterable of `.TaskDef` - instances. Support for `.TaskDef` is deprecated and will be removed - after v27. + instance or `.PipelineGraph` instance. """ match pipeline: case PipelineGraph() as pipeline_graph: @@ -221,16 +206,7 @@ def registerDatasetTypes(registry: Registry, pipeline: Pipeline | Iterable[TaskD case Pipeline(): pipeline_graph = pipeline.to_graph() case _: - warnings.warn( - "Passing TaskDefs is deprecated and will not be supported after v27.", - category=FutureWarning, - stacklevel=find_outside_stacklevel("lsst.pipe.base"), - ) - pipeline_graph = PipelineGraph() - for task_def in pipeline: - pipeline_graph.add_task( - task_def.label, task_def.taskClass, task_def.config, connections=task_def.connections - ) + raise TypeError(f"Unexpected pipeline argument value: {pipeline}.") pipeline_graph.resolve(registry) dataset_types = [node.dataset_type for node in pipeline_graph.dataset_types.values()] dataset_types.append( From 76fc56e11ad1826d9d42cbf4d870e2d8c34564d9 Mon Sep 17 00:00:00 2001 From: Tim Jenness Date: Fri, 9 Aug 2024 09:47:40 -0700 Subject: [PATCH 2/2] Add news fragment --- doc/changes/DM-40443.removal.rst | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 doc/changes/DM-40443.removal.rst diff --git a/doc/changes/DM-40443.removal.rst b/doc/changes/DM-40443.removal.rst new file mode 100644 index 000000000..cb5c690a8 --- /dev/null +++ b/doc/changes/DM-40443.removal.rst @@ -0,0 +1,9 @@ +Removed deprecated code scheduled to be removed after v27: + +* Removed ``lsst.pipe.base.graphBuilder``. +* Removed ``lsst.pipe.base.pipeTools``. +* Removed ``lsst.pipe.base.BaseConnection.makeDatasetType`` +* Removed ``Pipeline.toExpandedPipeline`` (replaced by ``to_graph``). +* Removed ``PipelineDatasetTypes`` and ``TaskDatasetTypes``. +* Removed ``QuantumGraphBuilderError``. +* APIs no longer accept ``TaskDef``.