diff --git a/doc/changes/DM-40443.removal.rst b/doc/changes/DM-40443.removal.rst
new file mode 100644
index 000000000..cb5c690a8
--- /dev/null
+++ b/doc/changes/DM-40443.removal.rst
@@ -0,0 +1,9 @@
+Removed deprecated code scheduled to be removed after v27:
+
+* Removed ``lsst.pipe.base.graphBuilder``.
+* Removed ``lsst.pipe.base.pipeTools``.
+* Removed ``lsst.pipe.base.BaseConnection.makeDatasetType``
+* Removed ``Pipeline.toExpandedPipeline`` (replaced by ``to_graph``).
+* Removed ``PipelineDatasetTypes`` and ``TaskDatasetTypes``.
+* Removed ``QuantumGraphBuilderError``.
+* APIs no longer accept ``TaskDef``.
diff --git a/python/lsst/pipe/base/__init__.py b/python/lsst/pipe/base/__init__.py
index 72955a953..6635486ec 100644
--- a/python/lsst/pipe/base/__init__.py
+++ b/python/lsst/pipe/base/__init__.py
@@ -1,7 +1,3 @@
-# The graphBuilder module is deprecated, but we still export its symbols for
-# backwards compatibility.
-import warnings
-
from . import automatic_connection_constants, connectionTypes, pipeline_graph, pipelineIR, utils
from ._dataset_handle import *
@@ -17,12 +13,6 @@
from .connections import *
from .executionButlerBuilder import *
from .graph import *
-
-with warnings.catch_warnings():
- warnings.simplefilter("ignore", FutureWarning)
- from .graphBuilder import *
-del warnings
-
from .pipeline import *
# We import the main PipelineGraph type and the module (above), but we don't
diff --git a/python/lsst/pipe/base/connectionTypes.py b/python/lsst/pipe/base/connectionTypes.py
index 397caae2f..cc9985844 100644
--- a/python/lsst/pipe/base/connectionTypes.py
+++ b/python/lsst/pipe/base/connectionTypes.py
@@ -35,8 +35,7 @@
from collections.abc import Callable, Iterable, Sequence
from typing import ClassVar
-from deprecated.sphinx import deprecated as deprecated_sphinx # avoid clash with BaseConnection.deprecated
-from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, DimensionUniverse, Registry, StorageClass
+from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, Registry
from lsst.utils.introspection import find_outside_stacklevel
@@ -106,35 +105,6 @@ def __get__(self, inst, klass):
f"Connection {self.varName!r} of {klass.__name__} has been removed."
) from None
- # TODO: remove on DM-40443.
- @deprecated_sphinx(
- reason="Deprecated in favor of PipelineGraph, and will be removed after v27.",
- version="27.0",
- category=FutureWarning,
- )
- def makeDatasetType(
- self, universe: DimensionUniverse, parentStorageClass: StorageClass | str | None = None
- ) -> DatasetType:
- """Construct a true `~lsst.daf.butler.DatasetType` instance with
- normalized dimensions.
-
- Parameters
- ----------
- universe : `lsst.daf.butler.DimensionUniverse`
- Set of all known dimensions to be used to normalize the dimension
- names specified in config.
- parentStorageClass : `lsst.daf.butler.StorageClass` or `str`, optional
- Parent storage class for component datasets; `None` otherwise.
-
- Returns
- -------
- datasetType : `~lsst.daf.butler.DatasetType`
- The `~lsst.daf.butler.DatasetType` defined by this connection.
- """
- return DatasetType(
- self.name, universe.empty, self.storageClass, parentStorageClass=parentStorageClass
- )
-
@dataclasses.dataclass(frozen=True)
class DimensionedConnection(BaseConnection):
@@ -176,39 +146,6 @@ def __post_init__(self):
if not isinstance(self.dimensions, Iterable):
raise TypeError("Dimensions must be iterable of dimensions")
- # TODO: remove on DM-40443.
- @deprecated_sphinx(
- reason="Deprecated in favor of PipelineGraph, and will be removed after v27.",
- version="27.0",
- category=FutureWarning,
- )
- def makeDatasetType(
- self, universe: DimensionUniverse, parentStorageClass: StorageClass | str | None = None
- ) -> DatasetType:
- """Construct a true `~lsst.daf.butler.DatasetType` instance with
- normalized dimensions.
-
- Parameters
- ----------
- universe : `lsst.daf.butler.DimensionUniverse`
- Set of all known dimensions to be used to normalize the dimension
- names specified in config.
- parentStorageClass : `lsst.daf.butler.StorageClass` or `str`, optional
- Parent storage class for component datasets; `None` otherwise.
-
- Returns
- -------
- datasetType : `~lsst.daf.butler.DatasetType`
- The `~lsst.daf.butler.DatasetType` defined by this connection.
- """
- return DatasetType(
- self.name,
- universe.conform(self.dimensions),
- self.storageClass,
- isCalibration=self.isCalibration,
- parentStorageClass=parentStorageClass,
- )
-
@dataclasses.dataclass(frozen=True)
class BaseInput(DimensionedConnection):
diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py
deleted file mode 100644
index f1c6b5adc..000000000
--- a/python/lsst/pipe/base/graphBuilder.py
+++ /dev/null
@@ -1,190 +0,0 @@
-# This file is part of pipe_base.
-#
-# Developed for the LSST Data Management System.
-# This product includes software developed by the LSST Project
-# (http://www.lsst.org).
-# See the COPYRIGHT file at the top-level directory of this distribution
-# for details of code ownership.
-#
-# This software is dual licensed under the GNU General Public License and also
-# under a 3-clause BSD license. Recipients may choose which of these licenses
-# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
-# respectively. If you choose the GPL option then the following text applies
-# (but note that there is still no warranty even if you opt for BSD instead):
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-"""Module defining GraphBuilder class and related methods.
-"""
-
-from __future__ import annotations
-
-__all__ = ["GraphBuilder"]
-
-
-import warnings
-from collections.abc import Iterable, Mapping
-from typing import Any
-
-from deprecated.sphinx import deprecated
-from lsst.daf.butler import Butler, DataCoordinate, Datastore, Registry
-from lsst.daf.butler.registry.wildcards import CollectionWildcard
-from lsst.utils.introspection import find_outside_stacklevel
-
-from ._datasetQueryConstraints import DatasetQueryConstraintVariant
-from .all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
-from .graph import QuantumGraph
-from .pipeline import Pipeline, TaskDef
-from .pipeline_graph import PipelineGraph
-
-# Re-exports for backwards-compatibility.
-from .quantum_graph_builder import GraphBuilderError # noqa: F401
-from .quantum_graph_builder import OutputExistsError # noqa: F401
-from .quantum_graph_builder import PrerequisiteMissingError # noqa: F401
-
-# TODO: remove this module on DM-40443.
-warnings.warn(
- "The graphBuilder module is deprecated in favor of quantum_graph_builder, and will be removed after v27.",
- category=FutureWarning,
- stacklevel=find_outside_stacklevel("lsst.pipe.base"),
-)
-
-
-@deprecated(
- "Deprecated in favor of QuantumGraphBuilder and will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
-)
-class GraphBuilder:
- """GraphBuilder class is responsible for building task execution graph from
- a Pipeline.
-
- Parameters
- ----------
- registry : `~lsst.daf.butler.Registry`
- Data butler instance.
- skipExistingIn : `~typing.Any`
- Expressions representing the collections to search for existing
- output datasets that should be skipped. See
- :ref:`daf_butler_ordered_collection_searches`.
- clobberOutputs : `bool`, optional
- If `True` (default), allow quanta to created even if partial outputs
- exist; this requires the same behavior behavior to be enabled when
- executing.
- datastore : `~lsst.daf.butler.Datastore`, optional
- If not `None` then fill datastore records in each generated Quantum.
- """
-
- def __init__(
- self,
- registry: Registry,
- skipExistingIn: Any = None,
- clobberOutputs: bool = True,
- datastore: Datastore | None = None,
- ):
- self.registry = registry
- self.dimensions = registry.dimensions
- self.skipExistingIn = skipExistingIn
- self.clobberOutputs = clobberOutputs
- self.datastore = datastore
-
- def makeGraph(
- self,
- pipeline: Pipeline | Iterable[TaskDef],
- collections: Any,
- run: str,
- userQuery: str | None,
- datasetQueryConstraint: DatasetQueryConstraintVariant = DatasetQueryConstraintVariant.ALL,
- metadata: Mapping[str, Any] | None = None,
- bind: Mapping[str, Any] | None = None,
- dataId: DataCoordinate | None = None,
- ) -> QuantumGraph:
- """Create execution graph for a pipeline.
-
- Parameters
- ----------
- pipeline : `Pipeline` or `~collections.abc.Iterable` [ `TaskDef` ]
- Pipeline definition, task names/classes and their configs.
- collections : `~typing.Any`
- Expressions representing the collections to search for input
- datasets. See :ref:`daf_butler_ordered_collection_searches`.
- run : `str`
- Name of the `~lsst.daf.butler.CollectionType.RUN` collection for
- output datasets. Collection does not have to exist and it will be
- created when graph is executed.
- userQuery : `str`
- String which defines user-defined selection for registry, should be
- empty or `None` if there is no restrictions on data selection.
- datasetQueryConstraint : `DatasetQueryConstraintVariant`, optional
- The query constraint variant that should be used to constraint the
- query based on dataset existance, defaults to
- `DatasetQueryConstraintVariant.ALL`.
- metadata : Optional Mapping of `str` to primitives
- This is an optional parameter of extra data to carry with the
- graph. Entries in this mapping should be able to be serialized in
- JSON.
- bind : `~collections.abc.Mapping`, optional
- Mapping containing literal values that should be injected into the
- ``userQuery`` expression, keyed by the identifiers they replace.
- dataId : `lsst.daf.butler.DataCoordinate`, optional
- Data ID that should also be included in the query constraint.
- Ignored if ``pipeline`` is a `Pipeline` instance (which has its own
- data ID).
-
- Returns
- -------
- graph : `QuantumGraph`
- The constructed graph.
-
- Raises
- ------
- UserExpressionError
- Raised when user expression cannot be parsed.
- OutputExistsError
- Raised when output datasets already exist.
- Exception
- Other exceptions types may be raised by underlying registry
- classes.
- """
- if isinstance(pipeline, Pipeline):
- pipeline_graph = pipeline.to_graph()
- else:
- pipeline_graph = PipelineGraph(data_id=dataId)
- for task_def in pipeline:
- pipeline_graph.add_task(
- task_def.label,
- task_def.taskClass,
- config=task_def.config,
- connections=task_def.connections,
- )
- # We assume `registry` is actually a RegistryShim that has a butler
- # inside it, since that's now the only kind of Registry code outside
- # Butler should be able to get, and we assert that the datastore came
- # from the same place. Soon this interface will be deprecated in favor
- # of QuantumGraphBuilder (which takes a Butler directly, as all new
- # code should) anyway.
- butler: Butler = self.registry._butler # type: ignore
- assert butler._datastore is self.datastore or self.datastore is None
- qgb = AllDimensionsQuantumGraphBuilder(
- pipeline_graph,
- butler,
- input_collections=CollectionWildcard.from_expression(collections).require_ordered(),
- output_run=run,
- skip_existing_in=self.skipExistingIn if self.skipExistingIn is not None else (),
- clobber=self.clobberOutputs,
- where=userQuery if userQuery is not None else "",
- dataset_query_constraint=datasetQueryConstraint,
- bind=bind,
- )
- return qgb.build(metadata, attach_datastore_records=(self.datastore is not None))
diff --git a/python/lsst/pipe/base/pipeTools.py b/python/lsst/pipe/base/pipeTools.py
deleted file mode 100644
index ed7d1ccf0..000000000
--- a/python/lsst/pipe/base/pipeTools.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# This file is part of pipe_base.
-#
-# Developed for the LSST Data Management System.
-# This product includes software developed by the LSST Project
-# (http://www.lsst.org).
-# See the COPYRIGHT file at the top-level directory of this distribution
-# for details of code ownership.
-#
-# This software is dual licensed under the GNU General Public License and also
-# under a 3-clause BSD license. Recipients may choose which of these licenses
-# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
-# respectively. If you choose the GPL option then the following text applies
-# (but note that there is still no warranty even if you opt for BSD instead):
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-
-"""Module defining few methods to manipulate or query pipelines.
-"""
-
-from __future__ import annotations
-
-# No one should do import * from this module
-__all__ = ["isPipelineOrdered", "orderPipeline"]
-
-import warnings
-from collections.abc import Iterable
-from typing import TYPE_CHECKING
-
-from deprecated.sphinx import deprecated
-from lsst.utils.introspection import find_outside_stacklevel
-
-from .pipeline import Pipeline, TaskDef
-
-# Exceptions re-exported here for backwards compatibility.
-from .pipeline_graph import DuplicateOutputError, PipelineDataCycleError, PipelineGraph # noqa: F401
-
-if TYPE_CHECKING:
- from .taskFactory import TaskFactory
-
-# TODO: remove this module on DM-40443.
-warnings.warn(
- "The pipeTools module and its contents are deprecated in favor of PipelineGraph, and will be removed "
- "after v27.",
- category=FutureWarning,
- stacklevel=find_outside_stacklevel("lsst.pipe.base"),
-)
-
-
-@deprecated(
- "Deprecated and will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
-)
-class MissingTaskFactoryError(Exception):
- """Exception raised when client fails to provide TaskFactory instance."""
-
- pass
-
-
-@deprecated(
- "Deprecated in favor of PipelineGraph methods and will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
-)
-def isPipelineOrdered(pipeline: Pipeline | Iterable[TaskDef], taskFactory: TaskFactory | None = None) -> bool:
- """Check whether tasks in pipeline are correctly ordered.
-
- Pipeline is correctly ordered if for any DatasetType produced by a task
- in a pipeline all its consumer tasks are located after producer.
-
- Parameters
- ----------
- pipeline : `Pipeline` or `collections.abc.Iterable` [ `TaskDef` ]
- Pipeline description.
- taskFactory : `TaskFactory`, optional
- Ignored; present only for backwards compatibility.
-
- Returns
- -------
- is_ordered : `bool`
- True for correctly ordered pipeline, False otherwise.
-
- Raises
- ------
- ImportError
- Raised when task class cannot be imported.
- DuplicateOutputError
- Raised when there is more than one producer for a dataset type.
- """
- if isinstance(pipeline, Pipeline):
- graph = pipeline.to_graph()
- else:
- graph = PipelineGraph()
- for task_def in pipeline:
- graph.add_task(task_def.label, task_def.taskClass, task_def.config, task_def.connections)
- # Can't use graph.is_sorted because that requires sorted dataset type names
- # as well as sorted tasks.
- tasks_xgraph = graph.make_task_xgraph()
- seen: set[str] = set()
- for task_label in tasks_xgraph:
- successors = set(tasks_xgraph.successors(task_label))
- if not successors.isdisjoint(seen):
- return False
- seen.add(task_label)
- return True
-
-
-@deprecated(
- "Deprecated in favor of PipelineGraph methods and will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
-)
-def orderPipeline(pipeline: Pipeline | Iterable[TaskDef]) -> list[TaskDef]:
- """Re-order tasks in pipeline to satisfy data dependencies.
-
- Parameters
- ----------
- pipeline : `Pipeline` or `collections.abc.Iterable` [ `TaskDef` ]
- Pipeline description.
-
- Returns
- -------
- ordered : `list` [ `TaskDef` ]
- Correctly ordered pipeline.
-
- Raises
- ------
- DuplicateOutputError
- Raised when there is more than one producer for a dataset type.
- PipelineDataCycleError
- Raised when the pipeline has dependency cycles.
- """
- if isinstance(pipeline, Pipeline):
- graph = pipeline.to_graph()
- else:
- graph = PipelineGraph()
- for task_def in pipeline:
- graph.add_task(task_def.label, task_def.taskClass, task_def.config, task_def.connections)
- graph.sort()
- return list(graph._iter_task_defs())
diff --git a/python/lsst/pipe/base/pipeline.py b/python/lsst/pipe/base/pipeline.py
index ba9eb3dbe..d29b54ad2 100644
--- a/python/lsst/pipe/base/pipeline.py
+++ b/python/lsst/pipe/base/pipeline.py
@@ -29,7 +29,7 @@
from __future__ import annotations
-__all__ = ["Pipeline", "TaskDef", "TaskDatasetTypes", "PipelineDatasetTypes", "LabelSpecifier"]
+__all__ = ["Pipeline", "TaskDef", "LabelSpecifier"]
import copy
import logging
@@ -39,16 +39,15 @@
# -------------------------------
# Imports of standard modules --
# -------------------------------
-from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Set
+from collections.abc import Callable, Set
from dataclasses import dataclass
from types import MappingProxyType
-from typing import TYPE_CHECKING, ClassVar, cast
+from typing import TYPE_CHECKING, cast
# -----------------------------
# Imports for other modules --
# -----------------------------
-from deprecated.sphinx import deprecated
-from lsst.daf.butler import DataCoordinate, DatasetType, DimensionUniverse, NamedValueSet, Registry
+from lsst.daf.butler import DataCoordinate, DimensionUniverse, Registry
from lsst.resources import ResourcePath, ResourcePathExpression
from lsst.utils import doImportType
from lsst.utils.introspection import get_full_type_name
@@ -57,8 +56,7 @@
from . import pipeline_graph, pipelineIR
from ._instrument import Instrument as PipeBaseInstrument
from .config import PipelineTaskConfig
-from .connections import PipelineTaskConnections, iterConnections
-from .connectionTypes import Input
+from .connections import PipelineTaskConnections
from .pipelineTask import PipelineTask
if TYPE_CHECKING: # Imports needed only for type annotations; may be circular.
@@ -877,30 +875,6 @@ def to_graph(
graph.resolve(registry=registry, visualization_only=visualization_only)
return graph
- # TODO: remove on DM-40443.
- @deprecated(
- reason="Deprecated in favor of to_graph; will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
- )
- def toExpandedPipeline(self) -> Generator[TaskDef, None, None]:
- r"""Return a generator of `TaskDef`\s which can be used to create
- quantum graphs.
-
- Returns
- -------
- generator : generator of `TaskDef`
- The generator returned will be the sorted iterator of tasks which
- are to be used in constructing a quantum graph.
-
- Raises
- ------
- NotImplementedError
- If a dataId is supplied in a config block. This is in place for
- future use.
- """
- yield from self.to_graph()._iter_task_defs()
-
def _add_task_to_graph(self, label: str, graph: pipeline_graph.PipelineGraph) -> None:
"""Add a single task from this pipeline to a pipeline graph that is
under construction.
@@ -929,30 +903,6 @@ def _add_task_to_graph(self, label: str, graph: pipeline_graph.PipelineGraph) ->
)
graph.add_task(label, taskClass, config)
- # TODO: remove on DM-40443.
- @deprecated(
- reason="Deprecated in favor of to_graph; will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
- )
- def __iter__(self) -> Generator[TaskDef, None, None]:
- return self.toExpandedPipeline()
-
- # TODO: remove on DM-40443.
- @deprecated(
- reason="Deprecated in favor of to_graph; will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
- )
- def __getitem__(self, item: str) -> TaskDef:
- # Making a whole graph and then making a TaskDef from that is pretty
- # backwards, but I'm hoping to deprecate this method shortly in favor
- # of making the graph explicitly and working with its node objects.
- graph = pipeline_graph.PipelineGraph()
- self._add_task_to_graph(item, graph)
- (result,) = graph._iter_task_defs()
- return result
-
def __len__(self) -> int:
return len(self._pipelineIR.tasks)
@@ -971,584 +921,3 @@ def __eq__(self, other: object) -> bool:
raise NotImplementedError(
"Pipelines cannot be compared because config instances cannot be compared; see DM-27847."
)
-
-
-# TODO: remove on DM-40443.
-@deprecated(
- reason="TaskDatasetTypes has been replaced by PipelineGraph, and will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
-)
-@dataclass(frozen=True)
-class TaskDatasetTypes:
- """An immutable struct that extracts and classifies the dataset types used
- by a `PipelineTask`.
- """
-
- initInputs: NamedValueSet[DatasetType]
- """Dataset types that are needed as inputs in order to construct this Task.
-
- Task-level `initInputs` may be classified as either
- `~PipelineDatasetTypes.initInputs` or
- `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
- """
-
- initOutputs: NamedValueSet[DatasetType]
- """Dataset types that may be written after constructing this Task.
-
- Task-level `initOutputs` may be classified as either
- `~PipelineDatasetTypes.initOutputs` or
- `~PipelineDatasetTypes.initIntermediates` at the Pipeline level.
- """
-
- inputs: NamedValueSet[DatasetType]
- """Dataset types that are regular inputs to this Task.
-
- If an input dataset needed for a Quantum cannot be found in the input
- collection(s) or produced by another Task in the Pipeline, that Quantum
- (and all dependent Quanta) will not be produced.
-
- Task-level `inputs` may be classified as either
- `~PipelineDatasetTypes.inputs` or `~PipelineDatasetTypes.intermediates`
- at the Pipeline level.
- """
-
- queryConstraints: NamedValueSet[DatasetType]
- """Regular inputs that should not be used as constraints on the initial
- QuantumGraph generation data ID query, according to their tasks
- (`NamedValueSet`).
- """
-
- prerequisites: NamedValueSet[DatasetType]
- """Dataset types that are prerequisite inputs to this Task.
-
- Prerequisite inputs must exist in the input collection(s) before the
- pipeline is run, but do not constrain the graph - if a prerequisite is
- missing for a Quantum, `PrerequisiteMissingError` is raised.
-
- Prerequisite inputs are not resolved until the second stage of
- QuantumGraph generation.
- """
-
- outputs: NamedValueSet[DatasetType]
- """Dataset types that are produced by this Task.
-
- Task-level `outputs` may be classified as either
- `~PipelineDatasetTypes.outputs` or `~PipelineDatasetTypes.intermediates`
- at the Pipeline level.
- """
-
- @classmethod
- def fromTaskDef(
- cls,
- taskDef: TaskDef,
- *,
- registry: Registry,
- include_configs: bool = True,
- storage_class_mapping: Mapping[str, str] | None = None,
- ) -> TaskDatasetTypes:
- """Extract and classify the dataset types from a single `PipelineTask`.
-
- Parameters
- ----------
- taskDef : `TaskDef`
- An instance of a `TaskDef` class for a particular `PipelineTask`.
- registry : `Registry`
- Registry used to construct normalized
- `~lsst.daf.butler.DatasetType` objects and retrieve those that are
- incomplete.
- include_configs : `bool`, optional
- If `True` (default) include config dataset types as
- ``initOutputs``.
- storage_class_mapping : `~collections.abc.Mapping` of `str` to \
- `~lsst.daf.butler.StorageClass`, optional
- If a taskdef contains a component dataset type that is unknown
- to the registry, its parent `~lsst.daf.butler.StorageClass` will
- be looked up in this mapping if it is supplied. If the mapping does
- not contain the composite dataset type, or the mapping is not
- supplied an exception will be raised.
-
- Returns
- -------
- types: `TaskDatasetTypes`
- The dataset types used by this task.
-
- Raises
- ------
- ValueError
- Raised if dataset type connection definition differs from
- registry definition.
- LookupError
- Raised if component parent StorageClass could not be determined
- and storage_class_mapping does not contain the composite type, or
- is set to None.
- """
-
- def makeDatasetTypesSet(
- connectionType: str,
- is_input: bool,
- freeze: bool = True,
- ) -> NamedValueSet[DatasetType]:
- """Construct a set of true `~lsst.daf.butler.DatasetType` objects.
-
- Parameters
- ----------
- connectionType : `str`
- Name of the connection type to produce a set for, corresponds
- to an attribute of type `list` on the connection class
- instance.
- is_input : `bool`
- These are input dataset types, else they are output dataset
- types.
- freeze : `bool`, optional
- If `True`, call `NamedValueSet.freeze` on the object returned.
-
- Returns
- -------
- datasetTypes : `NamedValueSet`
- A set of all datasetTypes which correspond to the input
- connection type specified in the connection class of this
- `PipelineTask`.
-
- Raises
- ------
- ValueError
- Raised if dataset type connection definition differs from
- registry definition.
- LookupError
- Raised if component parent StorageClass could not be determined
- and storage_class_mapping does not contain the composite type,
- or is set to None.
-
- Notes
- -----
- This function is a closure over the variables ``registry`` and
- ``taskDef``, and ``storage_class_mapping``.
- """
- datasetTypes = NamedValueSet[DatasetType]()
- for c in iterConnections(taskDef.connections, connectionType):
- dimensions = set(getattr(c, "dimensions", set()))
- if "skypix" in dimensions:
- try:
- datasetType = registry.getDatasetType(c.name)
- except LookupError as err:
- raise LookupError(
- f"DatasetType '{c.name}' referenced by "
- f"{type(taskDef.connections).__name__} uses 'skypix' as a dimension "
- "placeholder, but does not already exist in the registry. "
- "Note that reference catalog names are now used as the dataset "
- "type name instead of 'ref_cat'."
- ) from err
- rest1 = set(registry.dimensions.conform(dimensions - {"skypix"}).names)
- rest2 = datasetType.dimensions.names - datasetType.dimensions.skypix.names
- if rest1 != rest2:
- raise ValueError(
- f"Non-skypix dimensions for dataset type {c.name} declared in "
- f"connections ({rest1}) are inconsistent with those in "
- f"registry's version of this dataset ({rest2})."
- )
- else:
- # Component dataset types are not explicitly in the
- # registry. This complicates consistency checks with
- # registry and requires we work out the composite storage
- # class.
- registryDatasetType = None
- try:
- registryDatasetType = registry.getDatasetType(c.name)
- except KeyError:
- compositeName, componentName = DatasetType.splitDatasetTypeName(c.name)
- if componentName:
- if storage_class_mapping is None or compositeName not in storage_class_mapping:
- raise LookupError(
- "Component parent class cannot be determined, and "
- "composite name was not in storage class mapping, or no "
- "storage_class_mapping was supplied"
- ) from None
- else:
- parentStorageClass = storage_class_mapping[compositeName]
- else:
- parentStorageClass = None
- datasetType = c.makeDatasetType(
- registry.dimensions, parentStorageClass=parentStorageClass
- )
- registryDatasetType = datasetType
- else:
- datasetType = c.makeDatasetType(
- registry.dimensions, parentStorageClass=registryDatasetType.parentStorageClass
- )
-
- if registryDatasetType and datasetType != registryDatasetType:
- # The dataset types differ but first check to see if
- # they are compatible before raising.
- if is_input:
- # This DatasetType must be compatible on get.
- is_compatible = datasetType.is_compatible_with(registryDatasetType)
- else:
- # Has to be able to be converted to expect type
- # on put.
- is_compatible = registryDatasetType.is_compatible_with(datasetType)
- if is_compatible:
- # For inputs we want the pipeline to use the
- # pipeline definition, for outputs it should use
- # the registry definition.
- if not is_input:
- datasetType = registryDatasetType
- _LOG.debug(
- "Dataset types differ (task %s != registry %s) but are compatible"
- " for %s in %s.",
- datasetType,
- registryDatasetType,
- "input" if is_input else "output",
- taskDef.label,
- )
- else:
- try:
- # Explicitly check for storage class just to
- # make more specific message.
- _ = datasetType.storageClass
- except KeyError:
- raise ValueError(
- "Storage class does not exist for supplied dataset type "
- f"{datasetType} for {taskDef.label}."
- ) from None
- raise ValueError(
- f"Supplied dataset type ({datasetType}) inconsistent with "
- f"registry definition ({registryDatasetType}) "
- f"for {taskDef.label}."
- )
- datasetTypes.add(datasetType)
- if freeze:
- datasetTypes.freeze()
- return datasetTypes
-
- # optionally add initOutput dataset for config
- initOutputs = makeDatasetTypesSet("initOutputs", is_input=False, freeze=False)
- if include_configs:
- initOutputs.add(
- DatasetType(
- taskDef.configDatasetName,
- registry.dimensions.empty,
- storageClass=acc.CONFIG_INIT_OUTPUT_STORAGE_CLASS,
- )
- )
- initOutputs.freeze()
-
- # optionally add output dataset for metadata
- outputs = makeDatasetTypesSet("outputs", is_input=False, freeze=False)
-
- # Metadata is supposed to be of the TaskMetadata type, its dimensions
- # correspond to a task quantum.
- dimensions = registry.dimensions.conform(taskDef.connections.dimensions)
-
- # Allow the storage class definition to be read from the existing
- # dataset type definition if present.
- try:
- current = registry.getDatasetType(taskDef.metadataDatasetName)
- except KeyError:
- # No previous definition so use the default.
- storageClass = acc.METADATA_OUTPUT_STORAGE_CLASS
- else:
- storageClass = current.storageClass.name
- outputs.update({DatasetType(taskDef.metadataDatasetName, dimensions, storageClass)})
-
- if taskDef.logOutputDatasetName is not None:
- # Log output dimensions correspond to a task quantum.
- dimensions = registry.dimensions.conform(taskDef.connections.dimensions)
- outputs.update(
- {
- DatasetType(
- taskDef.logOutputDatasetName,
- dimensions,
- acc.LOG_OUTPUT_STORAGE_CLASS,
- )
- }
- )
-
- outputs.freeze()
-
- inputs = makeDatasetTypesSet("inputs", is_input=True)
- queryConstraints = NamedValueSet(
- inputs[c.name]
- for c in cast(Iterable[Input], iterConnections(taskDef.connections, "inputs"))
- if not c.deferGraphConstraint
- )
-
- return cls(
- initInputs=makeDatasetTypesSet("initInputs", is_input=True),
- initOutputs=initOutputs,
- inputs=inputs,
- queryConstraints=queryConstraints,
- prerequisites=makeDatasetTypesSet("prerequisiteInputs", is_input=True),
- outputs=outputs,
- )
-
-
-# TODO: remove on DM-40443.
-@deprecated(
- reason="PipelineDatasetTypes has been replaced by PipelineGraph, and will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
-)
-@dataclass(frozen=True)
-class PipelineDatasetTypes:
- """An immutable struct that classifies the dataset types used in a
- `Pipeline`.
- """
-
- packagesDatasetName: ClassVar[str] = acc.PACKAGES_INIT_OUTPUT_NAME
- """Name of a dataset type used to save package versions.
- """
-
- initInputs: NamedValueSet[DatasetType]
- """Dataset types that are needed as inputs in order to construct the Tasks
- in this Pipeline.
-
- This does not include dataset types that are produced when constructing
- other Tasks in the Pipeline (these are classified as `initIntermediates`).
- """
-
- initOutputs: NamedValueSet[DatasetType]
- """Dataset types that may be written after constructing the Tasks in this
- Pipeline.
-
- This does not include dataset types that are also used as inputs when
- constructing other Tasks in the Pipeline (these are classified as
- `initIntermediates`).
- """
-
- initIntermediates: NamedValueSet[DatasetType]
- """Dataset types that are both used when constructing one or more Tasks
- in the Pipeline and produced as a side-effect of constructing another
- Task in the Pipeline.
- """
-
- inputs: NamedValueSet[DatasetType]
- """Dataset types that are regular inputs for the full pipeline.
-
- If an input dataset needed for a Quantum cannot be found in the input
- collection(s), that Quantum (and all dependent Quanta) will not be
- produced.
- """
-
- queryConstraints: NamedValueSet[DatasetType]
- """Regular inputs that should be used as constraints on the initial
- QuantumGraph generation data ID query, according to their tasks
- (`NamedValueSet`).
- """
-
- prerequisites: NamedValueSet[DatasetType]
- """Dataset types that are prerequisite inputs for the full Pipeline.
-
- Prerequisite inputs must exist in the input collection(s) before the
- pipeline is run, but do not constrain the graph - if a prerequisite is
- missing for a Quantum, `PrerequisiteMissingError` is raised.
-
- Prerequisite inputs are not resolved until the second stage of
- QuantumGraph generation.
- """
-
- intermediates: NamedValueSet[DatasetType]
- """Dataset types that are output by one Task in the Pipeline and consumed
- as inputs by one or more other Tasks in the Pipeline.
- """
-
- outputs: NamedValueSet[DatasetType]
- """Dataset types that are output by a Task in the Pipeline and not consumed
- by any other Task in the Pipeline.
- """
-
- byTask: Mapping[str, TaskDatasetTypes]
- """Per-Task dataset types, keyed by label in the `Pipeline`.
-
- This is guaranteed to be zip-iterable with the `Pipeline` itself (assuming
- neither has been modified since the dataset types were extracted, of
- course).
- """
-
- @classmethod
- def fromPipeline(
- cls,
- pipeline: Pipeline | Iterable[TaskDef],
- *,
- registry: Registry,
- include_configs: bool = True,
- include_packages: bool = True,
- ) -> PipelineDatasetTypes:
- """Extract and classify the dataset types from all tasks in a
- `Pipeline`.
-
- Parameters
- ----------
- pipeline : `Pipeline` or `~collections.abc.Iterable` [ `TaskDef` ]
- A collection of tasks that can be run together.
- registry : `Registry`
- Registry used to construct normalized
- `~lsst.daf.butler.DatasetType` objects and retrieve those that are
- incomplete.
- include_configs : `bool`, optional
- If `True` (default) include config dataset types as
- ``initOutputs``.
- include_packages : `bool`, optional
- If `True` (default) include the dataset type for software package
- versions in ``initOutputs``.
-
- Returns
- -------
- types: `PipelineDatasetTypes`
- The dataset types used by this `Pipeline`.
-
- Raises
- ------
- ValueError
- Raised if Tasks are inconsistent about which datasets are marked
- prerequisite. This indicates that the Tasks cannot be run as part
- of the same `Pipeline`.
- """
- allInputs = NamedValueSet[DatasetType]()
- allOutputs = NamedValueSet[DatasetType]()
- allInitInputs = NamedValueSet[DatasetType]()
- allInitOutputs = NamedValueSet[DatasetType]()
- prerequisites = NamedValueSet[DatasetType]()
- queryConstraints = NamedValueSet[DatasetType]()
- byTask = dict()
- if include_packages:
- allInitOutputs.add(
- DatasetType(
- cls.packagesDatasetName,
- registry.dimensions.empty,
- storageClass=acc.PACKAGES_INIT_OUTPUT_STORAGE_CLASS,
- )
- )
- # create a list of TaskDefs in case the input is a generator
- pipeline = list(pipeline)
-
- # collect all the output dataset types
- typeStorageclassMap: dict[str, str] = {}
- for taskDef in pipeline:
- for outConnection in iterConnections(taskDef.connections, "outputs"):
- typeStorageclassMap[outConnection.name] = outConnection.storageClass
-
- for taskDef in pipeline:
- thisTask = TaskDatasetTypes.fromTaskDef(
- taskDef,
- registry=registry,
- include_configs=include_configs,
- storage_class_mapping=typeStorageclassMap,
- )
- allInitInputs.update(thisTask.initInputs)
- allInitOutputs.update(thisTask.initOutputs)
- allInputs.update(thisTask.inputs)
- # Inputs are query constraints if any task considers them a query
- # constraint.
- queryConstraints.update(thisTask.queryConstraints)
- prerequisites.update(thisTask.prerequisites)
- allOutputs.update(thisTask.outputs)
- byTask[taskDef.label] = thisTask
- if not prerequisites.isdisjoint(allInputs):
- raise ValueError(
- "{} marked as both prerequisites and regular inputs".format(
- {dt.name for dt in allInputs & prerequisites}
- )
- )
- if not prerequisites.isdisjoint(allOutputs):
- raise ValueError(
- "{} marked as both prerequisites and outputs".format(
- {dt.name for dt in allOutputs & prerequisites}
- )
- )
- # Make sure that components which are marked as inputs get treated as
- # intermediates if there is an output which produces the composite
- # containing the component
- intermediateComponents = NamedValueSet[DatasetType]()
- intermediateComposites = NamedValueSet[DatasetType]()
- outputNameMapping = {dsType.name: dsType for dsType in allOutputs}
- for dsType in allInputs:
- # get the name of a possible component
- name, component = dsType.nameAndComponent()
- # if there is a component name, that means this is a component
- # DatasetType, if there is an output which produces the parent of
- # this component, treat this input as an intermediate
- if component is not None:
- # This needs to be in this if block, because someone might have
- # a composite that is a pure input from existing data
- if name in outputNameMapping:
- intermediateComponents.add(dsType)
- intermediateComposites.add(outputNameMapping[name])
-
- def checkConsistency(a: NamedValueSet, b: NamedValueSet) -> None:
- common = a.names & b.names
- for name in common:
- # Any compatibility is allowed. This function does not know
- # if a dataset type is to be used for input or output.
- if not (a[name].is_compatible_with(b[name]) or b[name].is_compatible_with(a[name])):
- raise ValueError(f"Conflicting definitions for dataset type: {a[name]} != {b[name]}.")
-
- checkConsistency(allInitInputs, allInitOutputs)
- checkConsistency(allInputs, allOutputs)
- checkConsistency(allInputs, intermediateComposites)
- checkConsistency(allOutputs, intermediateComposites)
-
- def frozen(s: Set[DatasetType]) -> NamedValueSet[DatasetType]:
- assert isinstance(s, NamedValueSet)
- s.freeze()
- return s
-
- inputs = frozen(allInputs - allOutputs - intermediateComponents)
-
- return cls(
- initInputs=frozen(allInitInputs - allInitOutputs),
- initIntermediates=frozen(allInitInputs & allInitOutputs),
- initOutputs=frozen(allInitOutputs - allInitInputs),
- inputs=inputs,
- queryConstraints=frozen(queryConstraints & inputs),
- # If there are storage class differences in inputs and outputs
- # the intermediates have to choose priority. Here choose that
- # inputs to tasks much match the requested storage class by
- # applying the inputs over the top of the outputs.
- intermediates=frozen(allOutputs & allInputs | intermediateComponents),
- outputs=frozen(allOutputs - allInputs - intermediateComposites),
- prerequisites=frozen(prerequisites),
- byTask=MappingProxyType(byTask), # MappingProxyType -> frozen view of dict for immutability
- )
-
- @classmethod
- def initOutputNames(
- cls,
- pipeline: Pipeline | Iterable[TaskDef],
- *,
- include_configs: bool = True,
- include_packages: bool = True,
- ) -> Iterator[str]:
- """Return the names of dataset types ot task initOutputs, Configs,
- and package versions for a pipeline.
-
- Parameters
- ----------
- pipeline : `Pipeline` or `~collections.abc.Iterable` [ `TaskDef` ]
- A `Pipeline` instance or collection of `TaskDef` instances.
- include_configs : `bool`, optional
- If `True` (default) include config dataset types.
- include_packages : `bool`, optional
- If `True` (default) include the dataset type for package versions.
-
- Yields
- ------
- datasetTypeName : `str`
- Name of the dataset type.
- """
- if include_packages:
- # Package versions dataset type
- yield cls.packagesDatasetName
-
- if isinstance(pipeline, Pipeline):
- pipeline = pipeline.toExpandedPipeline()
-
- for taskDef in pipeline:
- # all task InitOutputs
- for name in taskDef.connections.initOutputs:
- attribute = getattr(taskDef.connections, name)
- yield attribute.name
-
- # config dataset name
- if include_configs:
- yield taskDef.configDatasetName
diff --git a/python/lsst/pipe/base/quantum_graph_builder.py b/python/lsst/pipe/base/quantum_graph_builder.py
index 990992881..261786ddf 100644
--- a/python/lsst/pipe/base/quantum_graph_builder.py
+++ b/python/lsst/pipe/base/quantum_graph_builder.py
@@ -44,7 +44,6 @@
from collections.abc import Iterable, Mapping, Sequence
from typing import TYPE_CHECKING, Any, final
-from deprecated.sphinx import deprecated
from lsst.daf.butler import (
Butler,
CollectionType,
@@ -85,28 +84,13 @@ class QuantumGraphBuilderError(Exception):
pass
-# TODO: remove class and switch downstream inheritance to just
-# QuantumGraphBuilderError on DM-40443.
-@deprecated(
- "Deprecated in favor of QuantumGraphBuilderError and will be removed after v27.",
- version="v27.0",
- category=FutureWarning,
-)
-class GraphBuilderError(QuantumGraphBuilderError):
- """Backwards-compatibility near-alias for QuantumGraphBuilderError."""
-
- pass
-
-
-# Inherit from backwards-compatibility alias for backwards-compatibility.
-class OutputExistsError(GraphBuilderError):
+class OutputExistsError(QuantumGraphBuilderError):
"""Exception generated when output datasets already exist."""
pass
-# Inherit from backwards-compatibility alias for backwards-compatibility.
-class PrerequisiteMissingError(GraphBuilderError):
+class PrerequisiteMissingError(QuantumGraphBuilderError):
"""Exception generated when a prerequisite dataset does not exist."""
pass
diff --git a/python/lsst/pipe/base/taskFactory.py b/python/lsst/pipe/base/taskFactory.py
index 77abc49df..423a25bca 100644
--- a/python/lsst/pipe/base/taskFactory.py
+++ b/python/lsst/pipe/base/taskFactory.py
@@ -39,7 +39,6 @@
if TYPE_CHECKING:
from lsst.daf.butler import DatasetRef, LimitedButler
- from .pipeline import TaskDef
from .pipeline_graph import TaskNode
from .pipelineTask import PipelineTask
@@ -54,7 +53,7 @@ class TaskFactory(metaclass=ABCMeta):
@abstractmethod
def makeTask(
self,
- task_node: TaskDef | TaskNode, # TODO: remove TaskDef on DM-40443.
+ task_node: TaskNode,
/,
butler: LimitedButler,
initInputRefs: Iterable[DatasetRef] | None,
@@ -63,9 +62,8 @@ def makeTask(
Parameters
----------
- task_node : `~pipeline_graph.TaskNode` or `TaskDef`
- Task definition structure. `TaskDef` support is deprecated and
- will be removed after v27.
+ task_node : `~pipeline_graph.TaskNode`
+ Task definition structure.
butler : `lsst.daf.butler.LimitedButler`
Butler instance used to obtain initialization inputs for task.
initInputRefs : `~collections.abc.Iterable` of \
diff --git a/python/lsst/pipe/base/tests/simpleQGraph.py b/python/lsst/pipe/base/tests/simpleQGraph.py
index 60cd86ebb..f0c069567 100644
--- a/python/lsst/pipe/base/tests/simpleQGraph.py
+++ b/python/lsst/pipe/base/tests/simpleQGraph.py
@@ -32,7 +32,6 @@
__all__ = ["AddTaskConfig", "AddTask", "AddTaskFactoryMock"]
import logging
-import warnings
from collections.abc import Iterable, Mapping, MutableMapping
from typing import TYPE_CHECKING, Any, cast
@@ -43,7 +42,7 @@
from lsst.daf.butler.logging import ButlerLogRecords
from lsst.resources import ResourcePath
from lsst.utils import doImportType
-from lsst.utils.introspection import find_outside_stacklevel, get_full_type_name
+from lsst.utils.introspection import get_full_type_name
from .. import connectionTypes as cT
from .._instrument import Instrument
@@ -53,7 +52,7 @@
from ..config import PipelineTaskConfig
from ..connections import PipelineTaskConnections
from ..graph import QuantumGraph
-from ..pipeline import Pipeline, TaskDef
+from ..pipeline import Pipeline
from ..pipeline_graph import PipelineGraph, TaskNode
from ..pipelineTask import PipelineTask
from ..struct import Struct
@@ -178,28 +177,17 @@ def __init__(self, stopAt: int = -1):
def makeTask(
self,
- task_node: TaskDef | TaskNode,
+ task_node: TaskNode,
/,
butler: LimitedButler,
initInputRefs: Iterable[DatasetRef] | None,
) -> PipelineTask:
- if isinstance(task_node, TaskDef):
- # TODO: remove support on DM-40443.
- warnings.warn(
- "Passing TaskDef to TaskFactory is deprecated and will not be supported after v27.",
- FutureWarning,
- find_outside_stacklevel("lsst.pipe.base"),
- )
- task_class = task_node.taskClass
- assert task_class is not None
- else:
- task_class = task_node.task_class
- task = task_class(config=task_node.config, initInputs=None, name=task_node.label)
+ task = task_node.task_class(config=task_node.config, initInputs=None, name=task_node.label)
task.taskFactory = self # type: ignore
return task
-def registerDatasetTypes(registry: Registry, pipeline: Pipeline | Iterable[TaskDef] | PipelineGraph) -> None:
+def registerDatasetTypes(registry: Registry, pipeline: Pipeline | PipelineGraph) -> None:
"""Register all dataset types used by tasks in a registry.
Copied and modified from `PreExecInit.initializeDatasetTypes`.
@@ -208,12 +196,9 @@ def registerDatasetTypes(registry: Registry, pipeline: Pipeline | Iterable[TaskD
----------
registry : `~lsst.daf.butler.Registry`
Registry instance.
- pipeline : `.Pipeline`, `~collections.abc..Iterable` of `.TaskDef`, or \
- `.pipeline_graph.PipelineGraph`
+ pipeline : `.Pipeline`, or `.pipeline_graph.PipelineGraph`
The pipeline whose dataset types should be registered, as a `.Pipeline`
- instance, `.PipelineGraph` instance, or iterable of `.TaskDef`
- instances. Support for `.TaskDef` is deprecated and will be removed
- after v27.
+ instance or `.PipelineGraph` instance.
"""
match pipeline:
case PipelineGraph() as pipeline_graph:
@@ -221,16 +206,7 @@ def registerDatasetTypes(registry: Registry, pipeline: Pipeline | Iterable[TaskD
case Pipeline():
pipeline_graph = pipeline.to_graph()
case _:
- warnings.warn(
- "Passing TaskDefs is deprecated and will not be supported after v27.",
- category=FutureWarning,
- stacklevel=find_outside_stacklevel("lsst.pipe.base"),
- )
- pipeline_graph = PipelineGraph()
- for task_def in pipeline:
- pipeline_graph.add_task(
- task_def.label, task_def.taskClass, task_def.config, connections=task_def.connections
- )
+ raise TypeError(f"Unexpected pipeline argument value: {pipeline}.")
pipeline_graph.resolve(registry)
dataset_types = [node.dataset_type for node in pipeline_graph.dataset_types.values()]
dataset_types.append(