Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-41650 Add step definitions to pipelines #392

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-41650.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a section to pipelines which allows the explicit declaration of which susbsets correspond to steps and the dimensions the step's quanta can be sharded with.
27 changes: 27 additions & 0 deletions doc/lsst.pipe.base/creating-a-pipeline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,33 @@ Once a ``subset`` is created the label associated with it can be used in
any context where task labels are accepted. Examples of this will be shown
in :ref:`pipeline-running-intro`.

.. _pipeline_creating_steps:

-----
Steps
-----
Subsets are designed to be an encapsulation of a collection of tasks that are
useful to be run together. These runs could be for reasons as small as a
producing quick set of QA debugging plots or as large as divisions of the
complete pipeline for survey level production.

Because divisions of the pipeline have a special place in survey production, pipelines have a special place where they are highlighted, the ``steps`` key.
This key is a list of the labels for all subsets that are considered steps in end to end processing.
Alongside each label, a step must be declared with the set of dimensions the step is expected to run over.
An example of what the step syntax looks like can be seen in the example below.
These bits of information allow campaign management / batch production software better reason about how to handle the processing workflow found withing a pipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
These bits of information allow campaign management / batch production software better reason about how to handle the processing workflow found withing a pipeline.
These bits of information allow campaign management / batch production software to better reason about how to handle the processing workflow found within a pipeline.


.. code-block:: yaml

steps:
# the label corresponding to a declared subset, and the dimensions
# the processing of that subset is expected to take
- label: step1
sharding_dimensions: visit, detector
- label: step2
sharding_dimensions: tract, patch, skymap


.. _pipeline_creating_imports:

-----------
Expand Down
78 changes: 77 additions & 1 deletion python/lsst/pipe/base/pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,10 @@
"""Boolean attribute to dictate if contracts should be inherited with the
pipeline or not.
"""
importSteps: bool = True
"""Boolean attribute to dictate if steps should be inherited with the
pipeline or not.
"""
labeledSubsetModifyMode: PipelineSubsetCtrl = PipelineSubsetCtrl.DROP
"""Controls how labeled subsets are handled when an import ends up not
including (either through an include or exclusion list) a task label that
Expand Down Expand Up @@ -517,6 +521,9 @@
for label in subsets_in_exclude:
included_labels.difference_update(tmp_pipeline.labeled_subsets[label].subset)

if not self.importSteps:
tmp_pipeline.steps = []

tmp_pipeline = tmp_pipeline.subset_from_labels(included_labels, self.labeledSubsetModifyMode)

if not self.importContracts:
Expand All @@ -533,6 +540,16 @@
)


@dataclass
class StepIR:
"""Intermediate representation of a step definition."""

label: str
"""The label associated with this step."""
sharding_dimensions: list[str]
"""The dimensions to use when sharding this step."""


class PipelineIR:
"""Intermediate representation of a pipeline definition.

Expand Down Expand Up @@ -584,12 +601,18 @@
# Process any named label subsets
self._read_labeled_subsets(loaded_yaml)

# Process defined sets
self._read_step_declaration(loaded_yaml)

# Process any inherited pipelines
self._read_imports(loaded_yaml)

# verify named subsets, must be done after inheriting
self._verify_labeled_subsets()

# verify steps, must be done after inheriting
self._verify_steps()

def _read_contracts(self, loaded_yaml: dict[str, Any]) -> None:
"""Process the contracts portion of the loaded yaml document

Expand Down Expand Up @@ -639,6 +662,28 @@
for key, value in loaded_subsets.items():
self.labeled_subsets[key] = LabeledSubset.from_primitives(key, value)

def _read_step_declaration(self, loaded_yaml: dict[str, Any]) -> None:
"""Process the steps portion of the loaded yaml document

Steps are subsets that are declared to be normal parts of the overall
processing of the pipeline. Not all subsets need to be a step, as they
can exist for certain targeted processing, such as debugging.

Parameters
----------
loaded_yaml: `dict`
A dictionary which matches the structure that would be produced
by a yaml reader which parses a pipeline definition document
"""
loaded_steps = loaded_yaml.pop("steps", [])
temp_steps: dict[str, StepIR] = {}
for declaration in loaded_steps:
new_step = StepIR(**declaration)
existing = temp_steps.setdefault(new_step.label, new_step)
if existing is not new_step:
raise ValueError(f"Step {existing.label} was declared twice.")
self.steps = [step for step in temp_steps.values()]

def _verify_labeled_subsets(self) -> None:
"""Verify that all the labels in each named subset exist within the
pipeline.
Expand All @@ -656,6 +701,16 @@
if label_intersection:
raise ValueError(f"Labeled subsets can not use the same label as a task: {label_intersection}")

def _verify_steps(self) -> None:
"""Verify that all step definitions have a corresponding labeled
subset.
"""
for step in self.steps:
if step.label not in self.labeled_subsets:
raise ValueError(

Check warning on line 710 in python/lsst/pipe/base/pipelineIR.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipelineIR.py#L710

Added line #L710 was not covered by tests
f"{step.label} was declared to be a step, but was not declared to be a labeled subset"
)

def _read_imports(self, loaded_yaml: dict[str, Any]) -> None:
"""Process the inherits portion of the loaded yaml document

Expand Down Expand Up @@ -725,6 +780,7 @@
accumulate_tasks: dict[str, TaskIR] = {}
accumulate_labeled_subsets: dict[str, LabeledSubset] = {}
accumulated_parameters = ParametersIR({})
accumulated_steps: dict[str, StepIR] = {}

for tmp_IR in pipelines:
if self.instrument is None:
Expand Down Expand Up @@ -757,6 +813,17 @@
)
accumulate_labeled_subsets.update(tmp_IR.labeled_subsets)
accumulated_parameters.update(tmp_IR.parameters)
for tmp_step in tmp_IR.steps:
existing = accumulated_steps.setdefault(tmp_step.label, tmp_step)
if existing != tmp_step:
raise ValueError(

Check warning on line 819 in python/lsst/pipe/base/pipelineIR.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipelineIR.py#L819

Added line #L819 was not covered by tests
f"There were conflicting step definitions in import {tmp_step}, {existing}"
)

for tmp_step in self.steps:
existing = accumulated_steps.setdefault(tmp_step.label, tmp_step)
if existing != tmp_step:
raise ValueError(f"There were conflicting step definitions in import {tmp_step}, {existing}")

# verify that any accumulated labeled subsets dont clash with a label
# from this pipeline
Expand All @@ -783,6 +850,7 @@
self.tasks: dict[str, TaskIR] = accumulate_tasks
accumulated_parameters.update(self.parameters)
self.parameters = accumulated_parameters
self.steps = list(accumulated_steps.values())

def _read_tasks(self, loaded_yaml: dict[str, Any]) -> None:
"""Process the tasks portion of the loaded yaml document
Expand Down Expand Up @@ -905,7 +973,7 @@

# create a copy of the object to iterate over
labeled_subsets = copy.copy(pipeline.labeled_subsets)
# remove any labeled subsets that no longer have a complete set
# remove or edit any labeled subsets that no longer have a complete set
for label, labeled_subset in labeled_subsets.items():
if extraTaskLabels := (labeled_subset.subset - pipeline.tasks.keys()):
match subsetCtrl:
Expand All @@ -915,6 +983,14 @@
for extra in extraTaskLabels:
labeled_subset.subset.discard(extra)

# remove any steps that correspond to removed subsets
new_steps = []
for step in pipeline.steps:
if step.label not in pipeline.labeled_subsets:
continue

Check warning on line 990 in python/lsst/pipe/base/pipelineIR.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipelineIR.py#L990

Added line #L990 was not covered by tests
new_steps.append(step)
pipeline.steps = new_steps

return pipeline

@classmethod
Expand Down
20 changes: 20 additions & 0 deletions tests/testPipeline5.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
description: Test Pipeline
parameters:
value1: valueA
value2: valueB
tasks:
modA: "test.moduleA"
modB: "test.moduleB"
subsets:
sub1:
subset:
- modA
- modB
sub2:
subset:
- modA
steps:
- label: sub1
sharding_dimensions: ['a', 'b']
- label: sub2
sharding_dimensions: ['a', 'c']
90 changes: 90 additions & 0 deletions tests/test_pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,96 @@ def testImportParsing(self):
with self.assertRaises(ValueError):
PipelineIR.from_string(pipeline_str)

# Test that importing Pipelines with different step definitions fails
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
imports:
- $TESTDIR/testPipeline5.yaml
steps:
- label: sub1
sharding_dimensions: ['a', 'e']
"""
)
with self.assertRaises(ValueError):
PipelineIR.from_string(pipeline_str)

# Test that it does not fail if steps are excluded
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
imports:
- location: $TESTDIR/testPipeline5.yaml
importSteps: false
steps:
- label: sub1
sharding_dimensions: ['a', 'e']
"""
)
PipelineIR.from_string(pipeline_str)

# Test that importing does work
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
imports:
- location: $TESTDIR/testPipeline5.yaml
"""
)
pipeline = PipelineIR.from_string(pipeline_str)
self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"})

def testSteps(self):
# Test that steps definitions are created
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
tasks:
modA: "test.moduleA"
modB: "test.moduleB"
subsets:
sub1:
subset:
- modA
- modB
sub2:
subset:
- modA
steps:
- label: sub1
sharding_dimensions: ['a', 'b']
- label: sub2
sharding_dimensions: ['a', 'b']
"""
)
pipeline = PipelineIR.from_string(pipeline_str)
self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"})

# Test that steps definitions must be unique
pipeline_str = textwrap.dedent(
"""
description: Test Pipeline
tasks:
modA: "test.moduleA"
modB: "test.moduleB"
subsets:
sub1:
subset:
- modA
- modB
sub2:
subset:
- modA
steps:
- label: sub1
sharding_dimensions: ['a', 'b']
- label: sub1
sharding_dimensions: ['a', 'b']
"""
)
with self.assertRaises(ValueError):
pipeline = PipelineIR.from_string(pipeline_str)

def testReadParameters(self):
# verify that parameters section are read in from a pipeline
pipeline_str = textwrap.dedent(
Expand Down
Loading