From ba8243db90d547dc50815601403fcda178057bf5 Mon Sep 17 00:00:00 2001 From: Nate Lust Date: Wed, 29 Nov 2023 08:42:45 -0500 Subject: [PATCH] Add step definitions section to Pipelines Added a directive to Pipelines that allows specifying which subsets correspond to an end to end processing step and what dimensions that those steps are expected to operate on. --- doc/changes/DM-41650.feature.md | 1 + doc/lsst.pipe.base/creating-a-pipeline.rst | 27 +++++++ python/lsst/pipe/base/pipelineIR.py | 78 ++++++++++++++++++- tests/testPipeline5.yaml | 20 +++++ tests/test_pipelineIR.py | 90 ++++++++++++++++++++++ 5 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 doc/changes/DM-41650.feature.md create mode 100644 tests/testPipeline5.yaml diff --git a/doc/changes/DM-41650.feature.md b/doc/changes/DM-41650.feature.md new file mode 100644 index 000000000..150259bb6 --- /dev/null +++ b/doc/changes/DM-41650.feature.md @@ -0,0 +1 @@ +Added a section to pipelines which allows the explicit declaration of which susbsets correspond to steps and the dimensions the step's quanta can be sharded with. \ No newline at end of file diff --git a/doc/lsst.pipe.base/creating-a-pipeline.rst b/doc/lsst.pipe.base/creating-a-pipeline.rst index a123a3ea4..0c6e2ce40 100644 --- a/doc/lsst.pipe.base/creating-a-pipeline.rst +++ b/doc/lsst.pipe.base/creating-a-pipeline.rst @@ -341,6 +341,33 @@ Once a ``subset`` is created the label associated with it can be used in any context where task labels are accepted. Examples of this will be shown in :ref:`pipeline-running-intro`. +.. _pipeline_creating_steps: + +----- +Steps +----- +Subsets are designed to be an encapsulation of a collection of tasks that are +useful to be run together. These runs could be for reasons as small as a +producing quick set of QA debugging plots or as large as divisions of the +complete pipeline for survey level production. + +Because divisions of the pipeline have a special place in survey production, pipelines have a special place where they are highlighted, the ``steps`` key. +This key is a list of the labels for all subsets that are considered steps in end to end processing. +Alongside each label, a step must be declared with the set of dimensions the step is expected to run over. +An example of what the step syntax looks like can be seen in the example below. +These bits of information allow campaign management / batch production software better reason about how to handle the processing workflow found withing a pipeline. + +.. code-block:: yaml + + steps: + # the label corresponding to a declared subset, and the dimensions + # the processing of that subset is expected to take + - label: step1 + sharding_dimensions: visit, detector + - label: step2 + sharding_dimensions: tract, patch, skymap + + .. _pipeline_creating_imports: ----------- diff --git a/python/lsst/pipe/base/pipelineIR.py b/python/lsst/pipe/base/pipelineIR.py index d02e17c40..6bc0ad2c7 100644 --- a/python/lsst/pipe/base/pipelineIR.py +++ b/python/lsst/pipe/base/pipelineIR.py @@ -464,6 +464,10 @@ class ImportIR: """Boolean attribute to dictate if contracts should be inherited with the pipeline or not. """ + importSteps: bool = True + """Boolean attribute to dictate if steps should be inherited with the + pipeline or not. + """ labeledSubsetModifyMode: PipelineSubsetCtrl = PipelineSubsetCtrl.DROP """Controls how labeled subsets are handled when an import ends up not including (either through an include or exclusion list) a task label that @@ -517,6 +521,9 @@ def toPipelineIR(self) -> "PipelineIR": for label in subsets_in_exclude: included_labels.difference_update(tmp_pipeline.labeled_subsets[label].subset) + if not self.importSteps: + tmp_pipeline.steps = [] + tmp_pipeline = tmp_pipeline.subset_from_labels(included_labels, self.labeledSubsetModifyMode) if not self.importContracts: @@ -533,6 +540,16 @@ def __eq__(self, other: object) -> bool: ) +@dataclass +class StepIR: + """Intermediate representation of a step definition.""" + + label: str + """The label associated with this step.""" + sharding_dimensions: list[str] + """The dimensions to use when sharding this step.""" + + class PipelineIR: """Intermediate representation of a pipeline definition. @@ -584,12 +601,18 @@ def __init__(self, loaded_yaml: dict[str, Any]): # Process any named label subsets self._read_labeled_subsets(loaded_yaml) + # Process defined sets + self._read_step_declaration(loaded_yaml) + # Process any inherited pipelines self._read_imports(loaded_yaml) # verify named subsets, must be done after inheriting self._verify_labeled_subsets() + # verify steps, must be done after inheriting + self._verify_steps() + def _read_contracts(self, loaded_yaml: dict[str, Any]) -> None: """Process the contracts portion of the loaded yaml document @@ -639,6 +662,28 @@ def _read_labeled_subsets(self, loaded_yaml: dict[str, Any]) -> None: for key, value in loaded_subsets.items(): self.labeled_subsets[key] = LabeledSubset.from_primitives(key, value) + def _read_step_declaration(self, loaded_yaml: dict[str, Any]) -> None: + """Process the steps portion of the loaded yaml document + + Steps are subsets that are declared to be normal parts of the overall + processing of the pipeline. Not all subsets need to be a step, as they + can exist for certain targeted processing, such as debugging. + + Parameters + ---------- + loaded_yaml: `dict` + A dictionary which matches the structure that would be produced + by a yaml reader which parses a pipeline definition document + """ + loaded_steps = loaded_yaml.pop("steps", []) + temp_steps: dict[str, StepIR] = {} + for declaration in loaded_steps: + new_step = StepIR(**declaration) + existing = temp_steps.setdefault(new_step.label, new_step) + if existing is not new_step: + raise ValueError(f"Step {existing.label} was declared twice.") + self.steps = [step for step in temp_steps.values()] + def _verify_labeled_subsets(self) -> None: """Verify that all the labels in each named subset exist within the pipeline. @@ -656,6 +701,16 @@ def _verify_labeled_subsets(self) -> None: if label_intersection: raise ValueError(f"Labeled subsets can not use the same label as a task: {label_intersection}") + def _verify_steps(self) -> None: + """Verify that all step definitions have a corresponding labeled + subset. + """ + for step in self.steps: + if step.label not in self.labeled_subsets: + raise ValueError( + f"{step.label} was declared to be a step, but was not declared to be a labeled subset" + ) + def _read_imports(self, loaded_yaml: dict[str, Any]) -> None: """Process the inherits portion of the loaded yaml document @@ -725,6 +780,7 @@ def merge_pipelines(self, pipelines: Iterable[PipelineIR]) -> None: accumulate_tasks: dict[str, TaskIR] = {} accumulate_labeled_subsets: dict[str, LabeledSubset] = {} accumulated_parameters = ParametersIR({}) + accumulated_steps: dict[str, StepIR] = {} for tmp_IR in pipelines: if self.instrument is None: @@ -757,6 +813,17 @@ def merge_pipelines(self, pipelines: Iterable[PipelineIR]) -> None: ) accumulate_labeled_subsets.update(tmp_IR.labeled_subsets) accumulated_parameters.update(tmp_IR.parameters) + for tmp_step in tmp_IR.steps: + existing = accumulated_steps.setdefault(tmp_step.label, tmp_step) + if existing != tmp_step: + raise ValueError( + f"There were conflicting step definitions in import {tmp_step}, {existing}" + ) + + for tmp_step in self.steps: + existing = accumulated_steps.setdefault(tmp_step.label, tmp_step) + if existing != tmp_step: + raise ValueError(f"There were conflicting step definitions in import {tmp_step}, {existing}") # verify that any accumulated labeled subsets dont clash with a label # from this pipeline @@ -783,6 +850,7 @@ def merge_pipelines(self, pipelines: Iterable[PipelineIR]) -> None: self.tasks: dict[str, TaskIR] = accumulate_tasks accumulated_parameters.update(self.parameters) self.parameters = accumulated_parameters + self.steps = list(accumulated_steps.values()) def _read_tasks(self, loaded_yaml: dict[str, Any]) -> None: """Process the tasks portion of the loaded yaml document @@ -905,7 +973,7 @@ def subset_from_labels( # create a copy of the object to iterate over labeled_subsets = copy.copy(pipeline.labeled_subsets) - # remove any labeled subsets that no longer have a complete set + # remove or edit any labeled subsets that no longer have a complete set for label, labeled_subset in labeled_subsets.items(): if extraTaskLabels := (labeled_subset.subset - pipeline.tasks.keys()): match subsetCtrl: @@ -915,6 +983,14 @@ def subset_from_labels( for extra in extraTaskLabels: labeled_subset.subset.discard(extra) + # remove any steps that correspond to removed subsets + new_steps = [] + for step in pipeline.steps: + if step.label not in pipeline.labeled_subsets: + continue + new_steps.append(step) + pipeline.steps = new_steps + return pipeline @classmethod diff --git a/tests/testPipeline5.yaml b/tests/testPipeline5.yaml new file mode 100644 index 000000000..993963b22 --- /dev/null +++ b/tests/testPipeline5.yaml @@ -0,0 +1,20 @@ +description: Test Pipeline +parameters: + value1: valueA + value2: valueB +tasks: + modA: "test.moduleA" + modB: "test.moduleB" +subsets: + sub1: + subset: + - modA + - modB + sub2: + subset: + - modA +steps: + - label: sub1 + sharding_dimensions: ['a', 'b'] + - label: sub2 + sharding_dimensions: ['a', 'c'] diff --git a/tests/test_pipelineIR.py b/tests/test_pipelineIR.py index 23554a60b..b9323e4c8 100644 --- a/tests/test_pipelineIR.py +++ b/tests/test_pipelineIR.py @@ -335,6 +335,96 @@ def testImportParsing(self): with self.assertRaises(ValueError): PipelineIR.from_string(pipeline_str) + # Test that importing Pipelines with different step definitions fails + pipeline_str = textwrap.dedent( + """ + description: Test Pipeline + imports: + - $TESTDIR/testPipeline5.yaml + steps: + - label: sub1 + sharding_dimensions: ['a', 'e'] + """ + ) + with self.assertRaises(ValueError): + PipelineIR.from_string(pipeline_str) + + # Test that it does not fail if steps are excluded + pipeline_str = textwrap.dedent( + """ + description: Test Pipeline + imports: + - location: $TESTDIR/testPipeline5.yaml + importSteps: false + steps: + - label: sub1 + sharding_dimensions: ['a', 'e'] + """ + ) + PipelineIR.from_string(pipeline_str) + + # Test that importing does work + pipeline_str = textwrap.dedent( + """ + description: Test Pipeline + imports: + - location: $TESTDIR/testPipeline5.yaml + """ + ) + pipeline = PipelineIR.from_string(pipeline_str) + self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"}) + + def testSteps(self): + # Test that steps definitions are created + pipeline_str = textwrap.dedent( + """ + description: Test Pipeline + tasks: + modA: "test.moduleA" + modB: "test.moduleB" + subsets: + sub1: + subset: + - modA + - modB + sub2: + subset: + - modA + steps: + - label: sub1 + sharding_dimensions: ['a', 'b'] + - label: sub2 + sharding_dimensions: ['a', 'b'] + """ + ) + pipeline = PipelineIR.from_string(pipeline_str) + self.assertEqual(set(step.label for step in pipeline.steps), {"sub1", "sub2"}) + + # Test that steps definitions must be unique + pipeline_str = textwrap.dedent( + """ + description: Test Pipeline + tasks: + modA: "test.moduleA" + modB: "test.moduleB" + subsets: + sub1: + subset: + - modA + - modB + sub2: + subset: + - modA + steps: + - label: sub1 + sharding_dimensions: ['a', 'b'] + - label: sub1 + sharding_dimensions: ['a', 'b'] + """ + ) + with self.assertRaises(ValueError): + pipeline = PipelineIR.from_string(pipeline_str) + def testReadParameters(self): # verify that parameters section are read in from a pipeline pipeline_str = textwrap.dedent(