diff --git a/python/lsst/pipe/base/connectionTypes.py b/python/lsst/pipe/base/connectionTypes.py index 7a212366f..04b49ab5b 100644 --- a/python/lsst/pipe/base/connectionTypes.py +++ b/python/lsst/pipe/base/connectionTypes.py @@ -28,7 +28,7 @@ import dataclasses import typing -from typing import Callable, Iterable, Optional +from typing import Callable, ClassVar, Iterable, Optional from lsst.daf.butler import ( CollectionSearch, @@ -194,6 +194,12 @@ class BaseInput(DimensionedConnection): """ deferLoad: bool = False + optional: ClassVar[bool] = False + """Quanta are not generated and never (fully) executed when there are no + datasets for a non-optional input connection, and most input connections + cannot be made optional. + """ + @dataclasses.dataclass(frozen=True) class Input(BaseInput): @@ -225,6 +231,12 @@ class PrerequisiteInput(BaseInput): using the DatasetType, registry, quantum dataId, and input collections passed to it. If no function is specified, the default temporal spatial lookup will be used. + optional : `bool`, optional + If `True` (`False` is default), generate quanta for this task even + when no input datasets for this connection exist. Missing optional + inputs will cause `ButlerQuantumContext.get` to return `None` or a + smaller container (never a same-size container with some `None` + elements) during execution. Notes ----- @@ -249,10 +261,12 @@ class PrerequisiteInput(BaseInput): between dimensions are not desired (e.g. a task that wants all detectors in each visit for which the visit overlaps a tract, not just those where that detector+visit combination overlaps the tract). + - Prerequisite inputs may be optional (regular inputs are never optional). """ lookupFunction: Optional[Callable[[DatasetType, Registry, DataCoordinate, CollectionSearch], Iterable[DatasetRef]]] = None + optional: bool = False @dataclasses.dataclass(frozen=True) diff --git a/python/lsst/pipe/base/connections.py b/python/lsst/pipe/base/connections.py index e0ba1cccd..e14ee3a8a 100644 --- a/python/lsst/pipe/base/connections.py +++ b/python/lsst/pipe/base/connections.py @@ -35,8 +35,9 @@ from . import config as configMod from .connectionTypes import (InitInput, InitOutput, Input, PrerequisiteInput, - Output, BaseConnection) -from lsst.daf.butler import DatasetRef, DatasetType, NamedKeyDict, Quantum + Output, BaseConnection, BaseInput) +from .executed_quantum import NoWorkQuantum +from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, NamedKeyMapping, Quantum if typing.TYPE_CHECKING: from .config import PipelineTaskConfig @@ -194,8 +195,8 @@ def __init__(cls, name, bases, dct, **kwargs): class QuantizedConnection(SimpleNamespace): - """A Namespace to map defined variable names of connections to their - `lsst.daf.buter.DatasetRef`s + """A Namespace to map defined variable names of connections to the + associated `lsst.daf.butler.DatasetRef` objects. This class maps the names used to define a connection on a PipelineTaskConnectionsClass to the corresponding @@ -456,26 +457,38 @@ def buildDatasetRefs(self, quantum: Quantum) -> typing.Tuple[InputQuantizedConne "in input quantum") return inputDatasetRefs, outputDatasetRefs - def adjustQuantum(self, datasetRefMap: NamedKeyDict[DatasetType, typing.Set[DatasetRef]] - ) -> NamedKeyDict[DatasetType, typing.Set[DatasetRef]]: + def adjustQuantum( + self, + inputs: Iterable[typing.Tuple[str, BaseInput, typing.FrozenSet[DatasetRef]]], + label: str, + dataId: DataCoordinate, + ) -> typing.Iterable[typing.Tuple[str, BaseInput, typing.AbstractSet[DatasetRef]]]: """Override to make adjustments to `lsst.daf.butler.DatasetRef` objects in the `lsst.daf.butler.core.Quantum` during the graph generation stage of the activator. - The base class implementation simply checks that input connections with - ``multiple`` set to `False` have no more than one dataset. - Parameters ---------- - datasetRefMap : `NamedKeyDict` - Mapping from dataset type to a `set` of - `lsst.daf.butler.DatasetRef` objects + inputs : `Iterable` of `tuple` + Three-element tuples, each a connection name, the connection + instance, and a `frozenset` of associated + `lsst.daf.butler.DatasetRef` objects, for all input and + prerequisite input connections. Guaranteed to support multi-pass + iteration. + label : `str` + Label for this task in the pipeline (should be used in all + diagnostic messages). + dataId : `lsst.daf.butler.DataCoordintae` + Data ID for this quantum in the pipeline (should be used in all + diagnostic messages). Returns ------- - datasetRefMap : `NamedKeyDict` - Modified mapping of input with possibly adjusted - `lsst.daf.butler.DatasetRef` objects. + adjusted : `Iterable` of `tuple` + Iterable of tuples of the same form as ``inputs``, with adjusted + sets of `lsst.daf.butler.DatasetRef` objects (datasets may be + removed, but not added). Connections not returned at all will be + considered to be unchanged. Raises ------ @@ -486,16 +499,93 @@ def adjustQuantum(self, datasetRefMap: NamedKeyDict[DatasetType, typing.Set[Data Overrides of this function have the option of raising an Exception if a field in the input does not satisfy a need for a corresponding pipelineTask, i.e. no reference catalogs are found. + NoWorkQuantum + Raised to indicate that this quantum should not be run; one or more + of its expected inputs do not exist, and if possible, should be + pruned from the QuantumGraph. + + Notes + ----- + The base class implementation performs useful checks and should be + called via `super` by most custom implementations. It always returns + an empty iterable, because it makes no changes. """ - for connection in itertools.chain(iterConnections(self, "inputs"), - iterConnections(self, "prerequisiteInputs")): - refs = datasetRefMap[connection.name] + for name, connection, refs in inputs: if not connection.multiple and len(refs) > 1: raise ScalarError( f"Found multiple datasets {', '.join(str(r.dataId) for r in refs)} " - f"for scalar connection {connection.name} ({refs[0].datasetType.name})." + f"for scalar connection {label}.{name} ({connection.name}) " + f"for quantum data ID {dataId}." + ) + if not connection.optional and not refs: + if isinstance(connection, PrerequisiteInput): + # This branch should only be possible during QG generation, + # or if someone deleted the dataset between making the QG + # and trying to run it. Either one should be a hard error. + raise FileNotFoundError( + f"No datasets found for non-optional connection {label}.{name} ({connection.name}) " + f"for quantum data ID {dataId}." + ) + else: + # This branch should be impossible during QG generation, + # because that algorithm can only make quanta whose inputs + # are either already present or should be created during + # execution. It can trigger during execution if the input + # wasn't actually created by an upstream task in the same + # graph. + raise NoWorkQuantum(label, name, connection) + return () + + def translateAdjustQuantumInputs( + self, + datasets: NamedKeyMapping[DatasetType, typing.Set[DatasetRef]], + ) -> typing.List[typing.Tuple[str, BaseInput, typing.FrozenSet[DatasetRef]]]: + """Translate a mapping of input datasets keyed on dataset type to the + form expected by the ``input`` argument to `adjustQuantum`. + + Parameters + ---------- + datasets : `lsst.daf.butler.NamedKeyMapping` + Mapping from `DatasetType` to a set of `DatasetRef`. Need not + include all input dataset types; those missing will be mapped to + empty sets in the result. + + Returns + ------- + translated : `list` of `tuple` + List of 3-element tuples of the form expected as the ``inputs`` + argument to `adjustQuantum`. Includes all input and prerequisite + inputs, even there are no associated datasets. + """ + results = [] + for connectionName in itertools.chain(self.inputs, self.prerequisiteInputs): + connectionInstance = getattr(self, connectionName) + results.append( + ( + connectionName, + connectionInstance, + frozenset(datasets.get(connectionInstance.name, frozenset())), ) - return datasetRefMap + ) + return results + + def hasPostWriteLogic(self): + """Test whether this `PipelineTask` can fail even after all outputs + have been written. + + When this returns `False` (the default base class behavior), execution + harnesses and QuantumGraph generation algorithms may assume that: + + - any quantum execution that yielded all predicted outputs was a + success, without checking actual exit status. + + - any quantum execution that yields no predicted outputs can be + treated as if `NoWorkQuantum` was raised. + + These assumptions enable important optimizations in code that attempts + to quickly determine the status of an executed quantum. + """ + return False def iterConnections(connections: PipelineTaskConnections, diff --git a/python/lsst/pipe/base/executed_quantum.py b/python/lsst/pipe/base/executed_quantum.py new file mode 100644 index 000000000..718de3a77 --- /dev/null +++ b/python/lsst/pipe/base/executed_quantum.py @@ -0,0 +1,253 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +__all__ = ( +) + +import enum +from typing import FrozenSet +import pydantic + +from lsst.daf.butler import DatasetId + + +class NoWorkQuantum(BaseException): + """An exception raised when a Quantum should not exist because there is no + work for it to do. + + This usually occurs because a non-optional input dataset is not present, or + a spatiotemporal overlap that was conservatively predicted does not + actually exist. + + This inherits from BaseException because it is used to signal a case that + we don't consider a real error, even though we often want to use try/except + logic to trap it. + """ + + +class RepeatableQuantumError(RuntimeError): + """Exception that may be raised by PipelineTasks (and code they delegate + to) in order to indicate that a repeatable problem that will not be + addressed by retries. + + This usually indicates that the algorithm and the data it has been given + are somehow incompatible, and the task should run fine on most other data. + + This exception may be used as a base class for more specific questions, or + used directly while chaining another exception, e.g.:: + + try: + run_code() + except SomeOtherError as err: + raise RepeatableQuantumError() from err + + This may be used for missing input data when the desired behavior is to + cause all downstream tasks being run be blocked, forcing the user to + address the problem. When the desired behavior is to skip this quantum and + attempt downstream tasks (or skip them) without its outputs, raise + `NoWorkQuantum` instead. When the desired behavior is to write only some + outputs, the task should exit as usual and will be considered a success. + """ + + +class InvalidQuantumError(Exception): + """Exception that may be raised by PipelineTasks (and code they delegate + to) in order to indicate logic bug or configuration problem. + + This usually indicates that the configured algorithm itself is invalid and + will not run on a significant fraction of quanta (often all of them). + + This exception may be used as a base class for more specific questions, or + used directly while chaining another exception, e.g.:: + + try: + run_code() + except SomeOtherError as err: + raise RepeatableQuantumError() from err + + Raising this exception in `PipelineTask.runQuantum` or something it calls + is a last resort - whenever possible, such problems should cause exceptions + in ``__init__`` or in QuantumGraph generation. It should never be used + for missing data. + """ + + +class QuantumStatusCategory(enum.Enum): + SUCCEEDED = enum.auto() + """Quantum ran to completion. + + This usually means at least some predicted outputs were actually produced, + but does not guarantee it unless + `PipelineTaskConnections.hasPostWriteLogic` returns `False`. + """ + + NO_WORK_FOUND = enum.auto() + """Quantum was run but found it had no work to do, and produced no outputs + (other than metadata). + + Rerunning a task that had this status will change the result only if its + `~ExtendedQuantumStatus.available_inputs` change. This status may be + invoked by a `PipelineTask` by raising `NoWorkQuantum`. + """ + + NO_WORK_SKIPPED = enum.auto() + """Quantum was not run by the execution harness, because at least one + required input was predicted but not actually produced by an upstream task. + + Tasks with this state should have metadata written directly by the + execution harness, and should never be rerun unless its + `~ExtendedQuantumStatus.available_inputs` change such that all required + inputs are now available. + """ + + INTERRUPTED = enum.auto() + """Quantum caught an external signal indicating it should stop execution, + and then shut down cleanly. + + This state should never be set if all predicted outputs were produced and + `PipelineTaskConnections.hasPostWriteLogic` returns `False`; execution + harnesses should record this as a success even if a last-moment + interruption attempt was detected. + """ + + FAILED_EXCEPTION = enum.auto() + """Quantum raised a Python exception that was caught by the execution + harness. + + This state does not attempt to distinguish between repeatable problems + and transient ones; rerunning a quantum with this status may or may not + yield a different result. + """ + + FAILED_UNKNOWN = enum.auto() + """Quantum failed for an unknown reason. + + This state does not attempt to distinguish between repeatable problems + and transient ones; rerunning a quantum with this status may or may not + yield a different result. + + This state cannot usually be set by Python execution harnesses that run + in the same process as the task code, but it may be set by higher-level + systems in the case of e.g. segfaults, and should be assumed in cases where + the file or dataset that would normally contain status information isn't + present at all. + """ + + FAILED_REPEATABLE = enum.auto() + """Quantum failed due to a problem that the task was able to recognize as + non-transient and highly likely to affect any attempt to rerun this + quantum. + + This status can be invoked by a `PipelineTask` by raising + `RepeatableQuantumError`. + """ + + FAILED_INVALID = enum.auto() + """Quantum failed because of a configuration problem or task logic bug that + must be fixed by the user. + + Execution harnesses may shut down entire runs if this status is detected in + any quantum. + + This should be set if a task failure (not an interruption) occurs after all + predicted outputs have been produced and + `PipelineTaskConnections.hasPostWriteLogic` returns `False`, as this + indicates that this method has been implemented incorrectly. + + This status can be invoked by a `PipelineTask` by raising + `InvalidQuantumError`. + """ + + @property + def can_retry(self) -> bool: + return self is self.FAILED_EXCEPTION or self is self.INTERRUPTED + + @property + def is_no_work(self) -> bool: + return self is self.NO_WORK_FOUND or self is self.NO_WORK_SKIPPED + + @property + def is_success(self) -> bool: + return self is self.SUCCEEDED or self is self.is_no_work + + @property + def is_failure(self) -> bool: + return ( + self is self.FAILED_EXCEPTION + or self is self.FAILED_UNKNOWN + or self is self.FAILED_REPEATABLE + or self is self.FAILED_INVALID + ) + + +class ExtendedQuantumStatus(pydantic.BaseModel): + """Struct used to record the state of a quantum that has been run. + """ + + category: QuantumStatusCategory + """Category describing the qualitative execution status for this quantum. + """ + + available_inputs: FrozenSet[DatasetId] = frozenset() + """IDs of all input datasets that were actually available to the task + at execution time. + + This may differ from the predicted inputs by removal of datasets that + were not actually produced by upstream tasks. + + This field will be set for all quanta for which provenance is successfully + written, regardless of status category. + """ + + actual_inputs: FrozenSet[DatasetId] = frozenset() + """IDs of all input datasets actually used by the task. + + Any dataset that can affect the output of the algorithm should be included. + For example, if a dataset is ultimately identified as some kind of outlier, + but was itself used in the determination of whether other datasets were or + were not outliers, it is still considered an actual input. + + If a `PipelineTask` never reads a dataset at all, it will automatically be + removed from `actual_inputs`. It may also explicitly call + `ButlerQuantumContext.makeInputUnused`. + """ + + actual_outputs: FrozenSet[DatasetId] = frozenset() + """IDs of all output dataset actually produced by this task. + + This is set automatically by calls to `ButlerQuantumContext.put`; + `PipelineTask` authors should not have to do anything manually. + """ + + # + # Notably missing: + # + # - Quantum identifiers. I'd like to wait for DM-30266, and then we need + # to think about how much we want to normalize/denormalize predicted and + # executed quantum information. + # + # - Exception objects. These look like a pain to serialize well, but + # doing it well seems really valuable. Maybe + # https://github.com/ionelmc/python-tblib? + # + # - Host information and resource usage. Just haven't gotten around to it, + # and I bet other people have schemas I should just copy from. + # diff --git a/python/lsst/pipe/base/graphBuilder.py b/python/lsst/pipe/base/graphBuilder.py index fc45b4fcb..65b6f7428 100644 --- a/python/lsst/pipe/base/graphBuilder.py +++ b/python/lsst/pipe/base/graphBuilder.py @@ -250,8 +250,14 @@ def makeQuantum(self) -> Quantum: # Give the task's Connections class an opportunity to remove some # inputs, or complain if they are unacceptable. # This will raise if one of the check conditions is not met, which is - # the intended behavior - allInputs = self.task.taskDef.connections.adjustQuantum(allInputs) + # the intended behavior. + adjustedInputTuples = self.task.taskDef.connections.adjustQuantum( + self.task.taskDef.connections.translateAdjustQuantumInputs(allInputs), + label=self.task.taskDef.label, + dataId=self.dataId, + ) + for _, connectionInstance, updatedRefs in adjustedInputTuples: + allInputs[connectionInstance.name] = updatedRefs return Quantum( taskName=self.task.taskDef.taskName, taskClass=self.task.taskDef.taskClass,