Skip to content

Commit

Permalink
Add step definitions section to Pipelines
Browse files Browse the repository at this point in the history
Added a directive to Pipelines that allows specifying which
subsets correspond to an end to end processing step and what
dimensions that those steps are expected to operate on.
  • Loading branch information
natelust committed Jan 18, 2024
1 parent 4e68ae3 commit ba8243d
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 1 deletion.
1 change: 1 addition & 0 deletions doc/changes/DM-41650.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a section to pipelines which allows the explicit declaration of which susbsets correspond to steps and the dimensions the step's quanta can be sharded with.
27 changes: 27 additions & 0 deletions doc/lsst.pipe.base/creating-a-pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,33 @@ Once a ``subset`` is created the label associated with it can be used in
any context where task labels are accepted. Examples of this will be shown
in :ref:`pipeline-running-intro`.

.. _pipeline_creating_steps:

-----
Steps
-----
Subsets are designed to be an encapsulation of a collection of tasks that are
useful to be run together. These runs could be for reasons as small as a
producing quick set of QA debugging plots or as large as divisions of the
complete pipeline for survey level production.

Because divisions of the pipeline have a special place in survey production, pipelines have a special place where they are highlighted, the ``steps`` key.
This key is a list of the labels for all subsets that are considered steps in end to end processing.
Alongside each label, a step must be declared with the set of dimensions the step is expected to run over.
An example of what the step syntax looks like can be seen in the example below.
These bits of information allow campaign management / batch production software better reason about how to handle the processing workflow found withing a pipeline.

.. code-block:: yaml
steps:
# the label corresponding to a declared subset, and the dimensions
# the processing of that subset is expected to take
- label: step1
sharding_dimensions: visit, detector
- label: step2
sharding_dimensions: tract, patch, skymap
.. _pipeline_creating_imports:

-----------
Expand Down
78 changes: 77 additions & 1 deletion python/lsst/pipe/base/pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@ class ImportIR:
"""Boolean attribute to dictate if contracts should be inherited with the
pipeline or not.
"""
importSteps: bool = True
"""Boolean attribute to dictate if steps should be inherited with the
pipeline or not.
"""
labeledSubsetModifyMode: PipelineSubsetCtrl = PipelineSubsetCtrl.DROP
"""Controls how labeled subsets are handled when an import ends up not
including (either through an include or exclusion list) a task label that
Expand Down Expand Up @@ -517,6 +521,9 @@ def toPipelineIR(self) -> "PipelineIR":
for label in subsets_in_exclude:
included_labels.difference_update(tmp_pipeline.labeled_subsets[label].subset)

if not self.importSteps:
tmp_pipeline.steps = []

tmp_pipeline = tmp_pipeline.subset_from_labels(included_labels, self.labeledSubsetModifyMode)

if not self.importContracts:
Expand All @@ -533,6 +540,16 @@ def __eq__(self, other: object) -> bool:
)


@dataclass
class StepIR:
"""Intermediate representation of a step definition."""

label: str
"""The label associated with this step."""
sharding_dimensions: list[str]
"""The dimensions to use when sharding this step."""


class PipelineIR:
"""Intermediate representation of a pipeline definition.
Expand Down Expand Up @@ -584,12 +601,18 @@ def __init__(self, loaded_yaml: dict[str, Any]):
# Process any named label subsets
self._read_labeled_subsets(loaded_yaml)

# Process defined sets
self._read_step_declaration(loaded_yaml)

# Process any inherited pipelines
self._read_imports(loaded_yaml)

# verify named subsets, must be done after inheriting
self._verify_labeled_subsets()

# verify steps, must be done after inheriting
self._verify_steps()

def _read_contracts(self, loaded_yaml: dict[str, Any]) -> None:
"""Process the contracts portion of the loaded yaml document
Expand Down Expand Up @@ -639,6 +662,28 @@ def _read_labeled_subsets(self, loaded_yaml: dict[str, Any]) -> None:
for key, value in loaded_subsets.items():
self.labeled_subsets[key] = LabeledSubset.from_primitives(key, value)

def _read_step_declaration(self, loaded_yaml: dict[str, Any]) -> None:
"""Process the steps portion of the loaded yaml document
Steps are subsets that are declared to be normal parts of the overall
processing of the pipeline. Not all subsets need to be a step, as they
can exist for certain targeted processing, such as debugging.
Parameters
----------
loaded_yaml: `dict`
A dictionary which matches the structure that would be produced
by a yaml reader which parses a pipeline definition document
"""
loaded_steps = loaded_yaml.pop("steps", [])
temp_steps: dict[str, StepIR] = {}
for declaration in loaded_steps:
new_step = StepIR(**declaration)
existing = temp_steps.setdefault(new_step.label, new_step)
if existing is not new_step:
raise ValueError(f"Step {existing.label} was declared twice.")
self.steps = [step for step in temp_steps.values()]

def _verify_labeled_subsets(self) -> None:
"""Verify that all the labels in each named subset exist within the
pipeline.
Expand All @@ -656,6 +701,16 @@ def _verify_labeled_subsets(self) -> None:
if label_intersection:
raise ValueError(f"Labeled subsets can not use the same label as a task: {label_intersection}")

def _verify_steps(self) -> None:
"""Verify that all step definitions have a corresponding labeled
subset.
"""
for step in self.steps:
if step.label not in self.labeled_subsets:
raise ValueError(

Check warning on line 710 in python/lsst/pipe/base/pipelineIR.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipelineIR.py#L710

Added line #L710 was not covered by tests
f"{step.label} was declared to be a step, but was not declared to be a labeled subset"
)

def _read_imports(self, loaded_yaml: dict[str, Any]) -> None:
"""Process the inherits portion of the loaded yaml document
Expand Down Expand Up @@ -725,6 +780,7 @@ def merge_pipelines(self, pipelines: Iterable[PipelineIR]) -> None:
accumulate_tasks: dict[str, TaskIR] = {}
accumulate_labeled_subsets: dict[str, LabeledSubset] = {}
accumulated_parameters = ParametersIR({})
accumulated_steps: dict[str, StepIR] = {}

for tmp_IR in pipelines:
if self.instrument is None:
Expand Down Expand Up @@ -757,6 +813,17 @@ def merge_pipelines(self, pipelines: Iterable[PipelineIR]) -> None:
)
accumulate_labeled_subsets.update(tmp_IR.labeled_subsets)
accumulated_parameters.update(tmp_IR.parameters)
for tmp_step in tmp_IR.steps:
existing = accumulated_steps.setdefault(tmp_step.label, tmp_step)
if existing != tmp_step:
raise ValueError(

Check warning on line 819 in python/lsst/pipe/base/pipelineIR.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipelineIR.py#L819

Added line #L819 was not covered by tests
f"There were conflicting step definitions in import {tmp_step}, {existing}"
)

for tmp_step in self.steps:
existing = accumulated_steps.setdefault(tmp_step.label, tmp_step)
if existing != tmp_step:
raise ValueError(f"There were conflicting step definitions in import {tmp_step}, {existing}")

# verify that any accumulated labeled subsets dont clash with a label
# from this pipeline
Expand All @@ -783,6 +850,7 @@ def merge_pipelines(self, pipelines: Iterable[PipelineIR]) -> None:
self.tasks: dict[str, TaskIR] = accumulate_tasks
accumulated_parameters.update(self.parameters)
self.parameters = accumulated_parameters
self.steps = list(accumulated_steps.values())

def _read_tasks(self, loaded_yaml: dict[str, Any]) -> None:
"""Process the tasks portion of the loaded yaml document
Expand Down Expand Up @@ -905,7 +973,7 @@ def subset_from_labels(

# create a copy of the object to iterate over
labeled_subsets = copy.copy(pipeline.labeled_subsets)
# remove any labeled subsets that no longer have a complete set
# remove or edit any labeled subsets that no longer have a complete set
for label, labeled_subset in labeled_subsets.items():
if extraTaskLabels := (labeled_subset.subset - pipeline.tasks.keys()):
match subsetCtrl:
Expand All @@ -915,6 +983,14 @@ def subset_from_labels(
for extra in extraTaskLabels:
labeled_subset.subset.discard(extra)

# remove any steps that correspond to removed subsets
new_steps = []
for step in pipeline.steps:
if step.label not in pipeline.labeled_subsets:
continue

Check warning on line 990 in python/lsst/pipe/base/pipelineIR.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipelineIR.py#L990

Added line #L990 was not covered by tests
new_steps.append(step)
pipeline.steps = new_steps

return pipeline

@classmethod
Expand Down
20 changes: 20 additions & 0 deletions tests/testPipeline5.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
description: Test Pipeline
parameters:
value1: valueA
value2: valueB
tasks:
modA: "test.moduleA"
modB: "test.moduleB"
subsets:
sub1:
subset:
- modA
- modB
sub2:
subset:
- modA
steps:
- label: sub1
sharding_dimensions: ['a', 'b']
- label: sub2
sharding_dimensions: ['a', 'c']
90 changes: 90 additions & 0 deletions tests/test_pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,96 @@ def testImportParsing(self):
with self.assertRaises(ValueError):
PipelineIR.from_string(pipeline_str)

# Test that importing Pipelines with different step definitions fails
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
imports:
- $TESTDIR/testPipeline5.yaml
steps:
- label: sub1
sharding_dimensions: ['a', 'e']
"""
)
with self.assertRaises(ValueError):
PipelineIR.from_string(pipeline_str)

# Test that it does not fail if steps are excluded
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
imports:
- location: $TESTDIR/testPipeline5.yaml
importSteps: false
steps:
- label: sub1
sharding_dimensions: ['a', 'e']
"""
)
PipelineIR.from_string(pipeline_str)

# Test that importing does work
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
imports:
- location: $TESTDIR/testPipeline5.yaml
"""
)
pipeline = PipelineIR.from_string(pipeline_str)
self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"})

def testSteps(self):
# Test that steps definitions are created
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
tasks:
modA: "test.moduleA"
modB: "test.moduleB"
subsets:
sub1:
subset:
- modA
- modB
sub2:
subset:
- modA
steps:
- label: sub1
sharding_dimensions: ['a', 'b']
- label: sub2
sharding_dimensions: ['a', 'b']
"""
)
pipeline = PipelineIR.from_string(pipeline_str)
self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"})

# Test that steps definitions must be unique
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
tasks:
modA: "test.moduleA"
modB: "test.moduleB"
subsets:
sub1:
subset:
- modA
- modB
sub2:
subset:
- modA
steps:
- label: sub1
sharding_dimensions: ['a', 'b']
- label: sub1
sharding_dimensions: ['a', 'b']
"""
)
with self.assertRaises(ValueError):
pipeline = PipelineIR.from_string(pipeline_str)

def testReadParameters(self):
# verify that parameters section are read in from a pipeline
pipeline_str = textwrap.dedent(
Expand Down

0 comments on commit ba8243d

Please sign in to comment.