Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-29761: start saving per-quantum provenance and propagating nothing-to-do cases #183

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 88 additions & 20 deletions python/lsst/pipe/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

from . import config as configMod
from .connectionTypes import (InitInput, InitOutput, Input, PrerequisiteInput,
Output, BaseConnection)
from lsst.daf.butler import DatasetRef, DatasetType, NamedKeyDict, Quantum
Output, BaseConnection, BaseInput)
from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType, NamedKeyMapping, Quantum

if typing.TYPE_CHECKING:
from .config import PipelineTaskConfig
Expand Down Expand Up @@ -194,8 +194,8 @@ def __init__(cls, name, bases, dct, **kwargs):


class QuantizedConnection(SimpleNamespace):
"""A Namespace to map defined variable names of connections to their
`lsst.daf.buter.DatasetRef`s
"""A Namespace to map defined variable names of connections to the
associated `lsst.daf.butler.DatasetRef` objects.

This class maps the names used to define a connection on a
PipelineTaskConnectionsClass to the corresponding
Expand Down Expand Up @@ -456,26 +456,38 @@ def buildDatasetRefs(self, quantum: Quantum) -> typing.Tuple[InputQuantizedConne
"in input quantum")
return inputDatasetRefs, outputDatasetRefs

def adjustQuantum(self, datasetRefMap: NamedKeyDict[DatasetType, typing.Set[DatasetRef]]
) -> NamedKeyDict[DatasetType, typing.Set[DatasetRef]]:
def adjustQuantum(
self,
inputs: Iterable[typing.Tuple[str, BaseInput, typing.FrozenSet[DatasetRef]]],
label: str,
dataId: DataCoordinate,
) -> typing.Iterable[typing.Tuple[str, BaseInput, typing.AbstractSet[DatasetRef]]]:
"""Override to make adjustments to `lsst.daf.butler.DatasetRef` objects
in the `lsst.daf.butler.core.Quantum` during the graph generation stage
of the activator.

The base class implementation simply checks that input connections with
``multiple`` set to `False` have no more than one dataset.

Parameters
----------
datasetRefMap : `NamedKeyDict`
Mapping from dataset type to a `set` of
`lsst.daf.butler.DatasetRef` objects
inputs : `Iterable` of `tuple`
Three-element tuples, each a connection name, the connection
instance, and a `frozenset` of associated
`lsst.daf.butler.DatasetRef` objects, for all input and
prerequisite input connections. Guaranteed to support multi-pass
iteration.
label : `str`
Label for this task in the pipeline (should be used in all
diagnostic messages).
dataId : `lsst.daf.butler.DataCoordintae`
Data ID for this quantum in the pipeline (should be used in all
diagnostic messages).

Returns
-------
datasetRefMap : `NamedKeyDict`
Modified mapping of input with possibly adjusted
`lsst.daf.butler.DatasetRef` objects.
adjusted : `Iterable` of `tuple`
Iterable of tuples of the same form as ``inputs``, with adjusted
sets of `lsst.daf.butler.DatasetRef` objects (datasets may be
removed, but not added). Connections not returned at all will be
considered to be unchanged.

Raises
------
Expand All @@ -486,16 +498,72 @@ def adjustQuantum(self, datasetRefMap: NamedKeyDict[DatasetType, typing.Set[Data
Overrides of this function have the option of raising an Exception
if a field in the input does not satisfy a need for a corresponding
pipelineTask, i.e. no reference catalogs are found.

Notes
-----
The base class implementation performs useful checks and should be
called via `super` by most custom implementations. It always returns
an empty iterable, because it makes no changes.
"""
for connection in itertools.chain(iterConnections(self, "inputs"),
iterConnections(self, "prerequisiteInputs")):
refs = datasetRefMap[connection.name]
for name, connection, refs in inputs:
if not connection.multiple and len(refs) > 1:
raise ScalarError(
f"Found multiple datasets {', '.join(str(r.dataId) for r in refs)} "
f"for scalar connection {connection.name} ({refs[0].datasetType.name})."
f"for scalar connection {label}.{name} ({connection.name}) "
f"for quantum data ID {dataId}."
)
return datasetRefMap
return ()

def translateAdjustQuantumInputs(
self,
datasets: NamedKeyMapping[DatasetType, typing.Set[DatasetRef]],
) -> typing.List[typing.Tuple[str, BaseInput, typing.FrozenSet[DatasetRef]]]:
"""Translate a mapping of input datasets keyed on dataset type to the
form expected by the ``input`` argument to `adjustQuantum`.

Parameters
----------
datasets : `lsst.daf.butler.NamedKeyMapping`
Mapping from `DatasetType` to a set of `DatasetRef`. Need not
include all input dataset types; those missing will be mapped to
empty sets in the result.

Returns
-------
translated : `list` of `tuple`
List of 3-element tuples of the form expected as the ``inputs``
argument to `adjustQuantum`. Includes all input and prerequisite
inputs, even there are no associated datasets.
"""
results = []
for connectionName in itertools.chain(self.inputs, self.prerequisiteInputs):
connectionInstance = getattr(self, connectionName)
results.append(
(
connectionName,
connectionInstance,
frozenset(datasets.get(connectionInstance.name, frozenset())),
)
)
return results

def hasPostWriteLogic(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷‍♂️ , I just never know what to do with boolean flag getters; I feel like "starts with verb" implies "it's a method", but there's pretty much no other way to name boolean properties. I'm not even consistent about that within this commit.

Everybody else in the world, please just agree on what we should do in this case and I'll happily follow along.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the implied rest of the question "and not a property"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just an attribute, That's the great thing about properties is that they can be migrated or substituted in place of an attribute if needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one shouldn't be an attribute, because I think the right specialization pattern is "override method [or property]".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that better than setting a class level attribute? I really don't understand why this is such an uncommon thing. Are you trying to protect it from changing, and if so other code would be needed here.

"""Test whether this `PipelineTask` can fail even after all outputs
have been written.

When this returns `False` (the default base class behavior), execution
harnesses and QuantumGraph generation algorithms may assume that:

- any quantum execution that yielded all predicted outputs was a
success, without checking actual exit status.

- any quantum execution that yields no predicted outputs can be
treated as if `NoWorkQuantum` was raised.

These assumptions enable important optimizations in code that attempts
to quickly determine the status of an executed quantum.
"""
return False


def iterConnections(connections: PipelineTaskConnections,
Expand Down
Loading