diff --git a/doc/changes/DM-41605.feature.md b/doc/changes/DM-41605.feature.md
new file mode 100644
index 00000000..034982aa
--- /dev/null
+++ b/doc/changes/DM-41605.feature.md
@@ -0,0 +1,6 @@
+Aggregate multiple `pipetask report` outputs into one wholistic `Summary`.
+
+While the `QuantumProvenanceGraph` was designed to resolve processing over
+dataquery-identified groups, `pipetask aggregate-reports` is designed to
+combine multiple group-level reports into one which totals the successes,
+issues and failures over the same section of pipeline.
\ No newline at end of file
diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py
index a2461268..db16e2f0 100644
--- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py
+++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py
@@ -26,7 +26,7 @@
# along with this program. If not, see .
import sys
-from collections.abc import Iterable, Iterator, Sequence
+from collections.abc import Iterator, Sequence
from contextlib import contextmanager
from functools import partial
from tempfile import NamedTemporaryFile
@@ -433,7 +433,7 @@ def report(
" also printed to the screen when using the --full-output-filename option.",
)
def aggregate_reports(
- filenames: Iterable[str], full_output_filename: str | None, brief: bool = False
+ filenames: Sequence[str], full_output_filename: str | None, brief: bool = False
) -> None:
"""Aggregate pipetask report output on disjoint data-id groups into one
Summary over common tasks and datasets. Intended for use when the same
diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py
index 63d0828a..bb88b4b9 100644
--- a/python/lsst/ctrl/mpexec/cli/script/report.py
+++ b/python/lsst/ctrl/mpexec/cli/script/report.py
@@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import pprint
-from collections.abc import Iterable, Sequence
-from typing import Any
+from collections.abc import Sequence
from astropy.table import Table
from lsst.daf.butler import Butler
@@ -196,18 +195,34 @@ def report_v2(
def aggregate_reports(
- filenames: Iterable[str], full_output_filename: str | None, brief: bool = False
+ filenames: Sequence[str], full_output_filename: str | None, brief: bool = False
) -> None:
- """Docstring.
+ """Aggregrate multiple `QuantumProvenanceGraph` summaries on separate
+ dataquery-identified groups into one wholistic report. This is intended for
+ reports over the same tasks in the same pipeline, after `pipetask report`
+ has been resolved over all graphs associated with each group.
- open a bunch of json files, call model_validate_json, call aggregrate,
- print summary
+ Parameters
+ ----------
+ filenames : `Sequence[str]`
+ The paths to the JSON files produced by `pipetask report` (note: this
+ is only compatible with the multi-graph or `--force-v2` option). These
+ files correspond to the `QuantumProvenanceGraph.Summary` objects which
+ are produced for each group.
+ full_output_filename : `str | None`
+ The name of the JSON file in which to store the aggregate report, if
+ passed. This is passed to `print_summary` at the end of this function.
+ brief : `bool = False`
+ Only display short (counts-only) summary on stdout. This includes
+ counts and not error messages or data_ids (similar to BPS report).
+ This option will still report all `cursed` datasets and `wonky`
+ quanta. This is passed to `print_summary` at the end of this function.
"""
- summaries: Iterable[Summary] = []
+ summaries: list[Summary] = []
for filename in filenames:
with open(filename) as f:
model = Summary.model_validate_json(f.read())
- summaries.append(model)
+ summaries.extend([model])
result = Summary.aggregate(summaries)
print_summary(result, full_output_filename, brief)
diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py
index 1c69a35c..947753cc 100644
--- a/tests/test_cliCmdReport.py
+++ b/tests/test_cliCmdReport.py
@@ -231,7 +231,7 @@ def test_report(self):
for task_summary in model.tasks.values():
self.assertEqual(task_summary.n_successful, 0)
self.assertEqual(task_summary.n_blocked, 0)
- self.assertEqual(task_summary.n_not_attempted, 1)
+ self.assertEqual(task_summary.n_unknown, 1)
self.assertEqual(task_summary.n_expected, 1)
self.assertListEqual(task_summary.failed_quanta, [])
self.assertListEqual(task_summary.recovered_quanta, [])
@@ -243,8 +243,8 @@ def test_report(self):
dataset_type_summary.unsuccessful_datasets,
[{"instrument": "INSTR", "detector": 0}],
)
- self.assertEqual(dataset_type_summary.n_published, 0)
- self.assertEqual(dataset_type_summary.n_unpublished, 0)
+ self.assertEqual(dataset_type_summary.n_visible, 0)
+ self.assertEqual(dataset_type_summary.n_shadowed, 0)
self.assertEqual(dataset_type_summary.n_predicted_only, 0)
self.assertEqual(dataset_type_summary.n_expected, 1)
self.assertEqual(dataset_type_summary.n_cursed, 0)
@@ -327,7 +327,7 @@ def test_aggregate_reports(self):
for task_label, task_summary in agg_sum.tasks.items():
self.assertEqual(task_summary.n_successful, 0)
self.assertEqual(task_summary.n_blocked, 0)
- self.assertEqual(task_summary.n_not_attempted, 2)
+ self.assertEqual(task_summary.n_unknown, 2)
self.assertEqual(task_summary.n_expected, 2)
self.assertListEqual(task_summary.failed_quanta, [])
self.assertListEqual(task_summary.recovered_quanta, [])
@@ -339,8 +339,8 @@ def test_aggregate_reports(self):
dataset_type_summary.unsuccessful_datasets,
[{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}],
)
- self.assertEqual(dataset_type_summary.n_published, 0)
- self.assertEqual(dataset_type_summary.n_unpublished, 0)
+ self.assertEqual(dataset_type_summary.n_visible, 0)
+ self.assertEqual(dataset_type_summary.n_shadowed, 0)
self.assertEqual(dataset_type_summary.n_predicted_only, 0)
self.assertEqual(dataset_type_summary.n_expected, 2)
self.assertEqual(dataset_type_summary.n_cursed, 0)