Skip to content

Commit

Permalink
Merge pull request #309 from lsst/tickets/DM-46525
Browse files Browse the repository at this point in the history
DM-46525: switch back to raising on partial output errors by default
  • Loading branch information
TallJimbo authored Oct 3, 2024
2 parents 6c9b749 + f56ab26 commit 8a8e908
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 10 deletions.
3 changes: 3 additions & 0 deletions doc/changes/DM-46525.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Set the default for raising on partial output error to `True`.

Allowing processing to proceed when we encounter an error that may not be fatal is functionality we'll still want eventually, but enabling it by default was premature, since our processing-status reporting tools are yet able to distinguish these cases from unqualified successes.
3 changes: 2 additions & 1 deletion python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,10 @@
)

raise_on_partial_outputs_option = MWOptionDecorator(
"--raise-on-partial-outputs",
"--raise-on-partial-outputs/--no-raise-on-partial-outputs",
help="Consider partial outputs from a task an error instead of a qualified success.",
is_flag=True,
default=True,
)

save_execution_butler_option = MWOptionDecorator(
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/separablePipelineExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __init__(
skip_existing_in: Iterable[str] | None = None,
task_factory: lsst.pipe.base.TaskFactory | None = None,
resources: lsst.pipe.base.ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
):
self._butler = Butler.from_config(butler=butler, collections=butler.collections, run=butler.run)
if not self._butler.collections:
Expand Down
10 changes: 5 additions & 5 deletions python/lsst/ctrl/mpexec/simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(
quantum_graph: QuantumGraph,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
):
self.quantum_graph = quantum_graph
self.butler = butler
Expand Down Expand Up @@ -148,7 +148,7 @@ def from_pipeline_filename(
bind: Mapping[str, Any] | None = None,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an on-disk
pipeline YAML file.
Expand Down Expand Up @@ -201,7 +201,7 @@ def from_task_class(
bind: Mapping[str, Any] | None = None,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from a pipeline
containing a single task.
Expand Down Expand Up @@ -268,7 +268,7 @@ def from_pipeline(
bind: Mapping[str, Any] | None = None,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an in-memory
pipeline.
Expand Down Expand Up @@ -321,7 +321,7 @@ def from_pipeline_graph(
bind: Mapping[str, Any] | None = None,
butler: Butler,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
) -> SimplePipelineExecutor:
"""Create an executor by building a QuantumGraph from an in-memory
pipeline graph.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(
resources: ExecutionResources | None = None,
skipExisting: bool = False,
assumeNoExistingOutputs: bool = False,
raise_on_partial_outputs: bool = False,
raise_on_partial_outputs: bool = True,
):
self.butler = butler
self.taskFactory = taskFactory
Expand Down
6 changes: 4 additions & 2 deletions tests/test_simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,10 @@ def test_partial_outputs_success(self):
pipeline_graph = PipelineGraph()
pipeline_graph.add_task("a", DynamicTestPipelineTask, config_a)
pipeline_graph.add_task("b", DynamicTestPipelineTask, config_b)
# Default behavior is to consider the partial a success and proceed.
executor = SimplePipelineExecutor.from_pipeline_graph(pipeline_graph, butler=self.butler)
# Consider the partial a success and proceed.
executor = SimplePipelineExecutor.from_pipeline_graph(
pipeline_graph, butler=self.butler, raise_on_partial_outputs=False
)
(_, _) = executor.as_generator(register_dataset_types=True)
self.assertFalse(self.butler.exists("intermediate"))
self.assertEqual(self.butler.get("output").storage_class, get_mock_name("StructuredDataDict"))
Expand Down

0 comments on commit 8a8e908

Please sign in to comment.