From e55b6922920b0e39463ae92dff61573365eed2ca Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Thu, 1 Aug 2024 11:20:26 -0700 Subject: [PATCH 1/3] Aggregate pipetask report output from disjoint groups into one report --- python/lsst/ctrl/mpexec/cli/cmd/__init__.py | 3 +- python/lsst/ctrl/mpexec/cli/cmd/commands.py | 37 ++++++++++++++++++- .../lsst/ctrl/mpexec/cli/script/__init__.py | 2 +- python/lsst/ctrl/mpexec/cli/script/report.py | 18 ++++++++- 4 files changed, 56 insertions(+), 4 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/cmd/__init__.py b/python/lsst/ctrl/mpexec/cli/cmd/__init__.py index f5f1caf3..5330afbe 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/__init__.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/__init__.py @@ -26,6 +26,7 @@ # along with this program. If not, see . __all__ = [ + "aggregate_reports", "build", "cleanup", "pre_exec_init_qbb", @@ -38,4 +39,4 @@ ] -from .commands import build, cleanup, pre_exec_init_qbb, purge, qgraph, report, run, run_qbb, update_graph_run +from .commands import aggregate_reports, build, cleanup, pre_exec_init_qbb, purge, qgraph, report, run, run_qbb, update_graph_run diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index 5d26938a..367c358f 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -26,7 +26,7 @@ # along with this program. If not, see . import sys -from collections.abc import Iterator, Sequence +from collections.abc import Iterator, Sequence, Iterable from contextlib import contextmanager from functools import partial from tempfile import NamedTemporaryFile @@ -416,3 +416,38 @@ def report( else: assert len(qgraphs) == 1, "Cannot make a report without a quantum graph." script.report(repo, qgraphs[0], full_output_filename, logs, brief) + +@click.command(cls=PipetaskCommand) +@click.argument("filenames", nargs=-1) +@click.option( + "--full-output-filename", + default="", + help="Output report as a file with this name (json).", +) +@click.option( + "--brief", + default=False, + is_flag=True, + help="Only show counts in report (a brief summary). Note that counts are" + " also printed to the screen when using the --full-output-filename option.", +) +def aggregate_reports( + filenames: Iterable[str], + full_output_filename: str | None, + brief: bool = False +) -> None: + """Aggregate pipetask report output on disjoint data-id groups into one + Summary over common tasks and datasets. Intended for use when the same + pipeline has been run over all groups (i.e., to aggregate all reports + for a given step). This functionality is only compatible with reports + from the `QuantumProvenanceGraph`, so the reports must be run over multiple + groups or with the `--force-v2` option. + + Save the report as a file (`--full-output-filename`) or print it to stdout + (default). If the terminal is overwhelmed with data_ids from failures try + the `--brief` option. + + FILENAMES are the space-separated paths to json file output created by + pipetask report. + """ + script.aggregate_reports(filenames, full_output_filename, brief) diff --git a/python/lsst/ctrl/mpexec/cli/script/__init__.py b/python/lsst/ctrl/mpexec/cli/script/__init__.py index dc14543e..399ebeeb 100644 --- a/python/lsst/ctrl/mpexec/cli/script/__init__.py +++ b/python/lsst/ctrl/mpexec/cli/script/__init__.py @@ -31,7 +31,7 @@ from .pre_exec_init_qbb import pre_exec_init_qbb from .purge import PurgeResult, purge from .qgraph import qgraph -from .report import report, report_v2 +from .report import report, report_v2, aggregate_reports from .run import run from .run_qbb import run_qbb from .update_graph_run import update_graph_run diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 481d0eb0..69d1a792 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -25,7 +25,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import pprint -from collections.abc import Sequence +import time +from collections.abc import Sequence, Iterable +from typing import Any from astropy.table import Table from lsst.daf.butler import Butler @@ -194,6 +196,20 @@ def report_v2( print_summary(summary, full_output_filename, brief) +def aggregate_reports(filenames: Iterable[str], full_output_filename: str | None, brief: bool = False) -> None: + """Docstring. + + open a bunch of json files, call model_validate_json, call aggregrate, print summary + """ + summaries : Iterable[Summary] = [] + for filename in filenames: + with open(filename) as f: + model = Summary.model_validate_json(f.read()) + summaries.append(model) + result = Summary.aggregate(summaries) + print_summary(result, full_output_filename, brief) + + def print_summary(summary: Summary, full_output_filename: str | None, brief: bool = False) -> None: """Take a `QuantumProvenanceGraph.Summary` object and write it to a file and/or the screen. From e33646e0e731bf2822864a5cabe4d2b24c95e45e Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Tue, 13 Aug 2024 15:38:53 -0700 Subject: [PATCH 2/3] Tests for pipetask aggregate-reports --- python/lsst/ctrl/mpexec/cli/cmd/__init__.py | 13 +- python/lsst/ctrl/mpexec/cli/cmd/commands.py | 9 +- .../lsst/ctrl/mpexec/cli/script/__init__.py | 2 +- python/lsst/ctrl/mpexec/cli/script/report.py | 12 +- requirements.txt | 2 +- tests/test_cliCmdReport.py | 440 ++++++------------ 6 files changed, 180 insertions(+), 298 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/cmd/__init__.py b/python/lsst/ctrl/mpexec/cli/cmd/__init__.py index 5330afbe..d90812a9 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/__init__.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/__init__.py @@ -39,4 +39,15 @@ ] -from .commands import aggregate_reports, build, cleanup, pre_exec_init_qbb, purge, qgraph, report, run, run_qbb, update_graph_run +from .commands import ( + aggregate_reports, + build, + cleanup, + pre_exec_init_qbb, + purge, + qgraph, + report, + run, + run_qbb, + update_graph_run, +) diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index 367c358f..a2461268 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -26,7 +26,7 @@ # along with this program. If not, see . import sys -from collections.abc import Iterator, Sequence, Iterable +from collections.abc import Iterable, Iterator, Sequence from contextlib import contextmanager from functools import partial from tempfile import NamedTemporaryFile @@ -417,6 +417,7 @@ def report( assert len(qgraphs) == 1, "Cannot make a report without a quantum graph." script.report(repo, qgraphs[0], full_output_filename, logs, brief) + @click.command(cls=PipetaskCommand) @click.argument("filenames", nargs=-1) @click.option( @@ -432,9 +433,7 @@ def report( " also printed to the screen when using the --full-output-filename option.", ) def aggregate_reports( - filenames: Iterable[str], - full_output_filename: str | None, - brief: bool = False + filenames: Iterable[str], full_output_filename: str | None, brief: bool = False ) -> None: """Aggregate pipetask report output on disjoint data-id groups into one Summary over common tasks and datasets. Intended for use when the same @@ -442,7 +441,7 @@ def aggregate_reports( for a given step). This functionality is only compatible with reports from the `QuantumProvenanceGraph`, so the reports must be run over multiple groups or with the `--force-v2` option. - + Save the report as a file (`--full-output-filename`) or print it to stdout (default). If the terminal is overwhelmed with data_ids from failures try the `--brief` option. diff --git a/python/lsst/ctrl/mpexec/cli/script/__init__.py b/python/lsst/ctrl/mpexec/cli/script/__init__.py index 399ebeeb..e195c8bd 100644 --- a/python/lsst/ctrl/mpexec/cli/script/__init__.py +++ b/python/lsst/ctrl/mpexec/cli/script/__init__.py @@ -31,7 +31,7 @@ from .pre_exec_init_qbb import pre_exec_init_qbb from .purge import PurgeResult, purge from .qgraph import qgraph -from .report import report, report_v2, aggregate_reports +from .report import aggregate_reports, report, report_v2 from .run import run from .run_qbb import run_qbb from .update_graph_run import update_graph_run diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 69d1a792..63d0828a 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import pprint -import time -from collections.abc import Sequence, Iterable +from collections.abc import Iterable, Sequence from typing import Any from astropy.table import Table @@ -196,12 +195,15 @@ def report_v2( print_summary(summary, full_output_filename, brief) -def aggregate_reports(filenames: Iterable[str], full_output_filename: str | None, brief: bool = False) -> None: +def aggregate_reports( + filenames: Iterable[str], full_output_filename: str | None, brief: bool = False +) -> None: """Docstring. - open a bunch of json files, call model_validate_json, call aggregrate, print summary + open a bunch of json files, call model_validate_json, call aggregrate, + print summary """ - summaries : Iterable[Summary] = [] + summaries: Iterable[Summary] = [] for filename in filenames: with open(filename) as f: model = Summary.model_validate_json(f.read()) diff --git a/requirements.txt b/requirements.txt index 4623a213..2debf254 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,6 @@ networkx lsst-resources @ git+https://github.com/lsst/resources@main lsst-daf-butler @ git+https://github.com/lsst/daf_butler@main lsst-utils @ git+https://github.com/lsst/utils@main -lsst-pipe-base @ git+https://github.com/lsst/pipe_base@main +lsst-pipe-base @ git+https://github.com/lsst/pipe_base@tickets/DM-41605 lsst-pex-config @ git+https://github.com/lsst/pex_config@main sqlalchemy diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index d3b8bb6b..1c69a35c 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -32,15 +32,39 @@ import yaml from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli +from lsst.ctrl.mpexec.cli.script.report import print_summary from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir -from lsst.pipe.base.quantum_provenance_graph import DatasetTypeSummary, Summary, TaskSummary +from lsst.pipe.base.quantum_provenance_graph import Summary from lsst.pipe.base.tests.simpleQGraph import makeSimpleQGraph from lsst.pipe.base.tests.util import check_output_run from yaml.loader import SafeLoader TESTDIR = os.path.abspath(os.path.dirname(__file__)) +expected_mock_datasets = [ + "add_dataset1", + "add2_dataset1", + "task0_metadata", + "task0_log", + "add_dataset2", + "add2_dataset2", + "task1_metadata", + "task1_log", + "add_dataset3", + "add2_dataset3", + "task2_metadata", + "task2_log", + "add_dataset4", + "add2_dataset4", + "task3_metadata", + "task3_log", + "add_dataset5", + "add2_dataset5", + "task4_metadata", + "task4_log", +] + class ReportTest(unittest.TestCase): """Test executing "pipetask report" command.""" @@ -203,291 +227,137 @@ def test_report(self): with open(test_filename_v2) as f: output = f.read() model = Summary.model_validate_json(output) - self.assertDictEqual( - model.tasks, - { - "task0": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - "task1": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - "task2": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - "task3": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - "task4": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - }, - ) - self.assertDictEqual( - model.datasets, - { - "add_dataset1": DatasetTypeSummary( - producer="task0", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset1": DatasetTypeSummary( - producer="task0", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task0_metadata": DatasetTypeSummary( - producer="task0", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task0_log": DatasetTypeSummary( - producer="task0", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add_dataset2": DatasetTypeSummary( - producer="task1", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset2": DatasetTypeSummary( - producer="task1", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task1_metadata": DatasetTypeSummary( - producer="task1", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task1_log": DatasetTypeSummary( - producer="task1", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add_dataset3": DatasetTypeSummary( - producer="task2", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset3": DatasetTypeSummary( - producer="task2", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task2_metadata": DatasetTypeSummary( - producer="task2", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task2_log": DatasetTypeSummary( - producer="task2", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add_dataset4": DatasetTypeSummary( - producer="task3", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset4": DatasetTypeSummary( - producer="task3", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task3_metadata": DatasetTypeSummary( - producer="task3", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task3_log": DatasetTypeSummary( - producer="task3", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add_dataset5": DatasetTypeSummary( - producer="task4", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset5": DatasetTypeSummary( - producer="task4", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task4_metadata": DatasetTypeSummary( - producer="task4", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task4_log": DatasetTypeSummary( - producer="task4", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - }, + # Below is the same set of tests as in `pipe_base`: + for task_summary in model.tasks.values(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_not_attempted, 1) + self.assertEqual(task_summary.n_expected, 1) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in model.datasets.items(): + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}], + ) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 1) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + + def test_aggregate_reports(self): + """Test `pipetask aggregate-reports` command. We make one + `SimpleQgraph` and then fake a copy in a couple of different ways, + making sure we can aggregate the similar graphs. + """ + metadata = {"output_run": "run1"} + butler, qgraph1 = makeSimpleQGraph( + run="run", + root=self.root, + metadata=metadata, + ) + + # Check that we can get the proper run collection from the qgraph + self.assertEqual(check_output_run(qgraph1, "run"), []) + + # Save the graph + graph_uri_1 = os.path.join(self.root, "graph1.qgraph") + qgraph1.saveUri(graph_uri_1) + + file1 = os.path.join(self.root, "report_test_1.json") + file2 = os.path.join(self.root, "report_test_2.json") + aggregate_file = os.path.join(self.root, "aggregate_report.json") + + report1 = self.runner.invoke( + pipetask_cli, + [ + "report", + self.root, + graph_uri_1, + "--no-logs", + "--full-output-filename", + file1, + "--force-v2", + ], + input="no", + ) + + self.assertEqual(report1.exit_code, 0, clickResultMsg(report1)) + # Now, copy the json output into a duplicate file and aggregate + with open(file1, "r") as f: + sum1 = Summary.model_validate_json(f.read()) + sum2 = sum1.model_copy(deep=True) + print_summary(sum2, file2, brief=False) + + # Then use these file outputs as the inputs to aggregate reports: + aggregate_report = self.runner.invoke( + pipetask_cli, + [ + "aggregate-reports", + file1, + file2, + "--full-output-filename", + aggregate_file, + ], ) + # Check that aggregate command had a zero exit code: + self.assertEqual(aggregate_report.exit_code, 0, clickResultMsg(aggregate_report)) + + # Check that it aggregates as expected: + with open(aggregate_file) as f: + agg_sum = Summary.model_validate_json(f.read()) + for task_label, task_summary in agg_sum.tasks.items(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_expected, 2) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in agg_sum.datasets.items(): + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 2) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 2) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") if __name__ == "__main__": From 46f434fdfecd9e2598e926d347ac16a96ef99d90 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Fri, 16 Aug 2024 18:02:32 -0700 Subject: [PATCH 3/3] Clean up and document --- doc/changes/DM-41605.feature.md | 6 ++++ python/lsst/ctrl/mpexec/cli/cmd/commands.py | 4 +-- python/lsst/ctrl/mpexec/cli/script/report.py | 31 +++++++++++++++----- requirements.txt | 2 +- tests/test_cliCmdReport.py | 12 ++++---- 5 files changed, 38 insertions(+), 17 deletions(-) create mode 100644 doc/changes/DM-41605.feature.md diff --git a/doc/changes/DM-41605.feature.md b/doc/changes/DM-41605.feature.md new file mode 100644 index 00000000..034982aa --- /dev/null +++ b/doc/changes/DM-41605.feature.md @@ -0,0 +1,6 @@ +Aggregate multiple `pipetask report` outputs into one wholistic `Summary`. + +While the `QuantumProvenanceGraph` was designed to resolve processing over +dataquery-identified groups, `pipetask aggregate-reports` is designed to +combine multiple group-level reports into one which totals the successes, +issues and failures over the same section of pipeline. \ No newline at end of file diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index a2461268..db16e2f0 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -26,7 +26,7 @@ # along with this program. If not, see . import sys -from collections.abc import Iterable, Iterator, Sequence +from collections.abc import Iterator, Sequence from contextlib import contextmanager from functools import partial from tempfile import NamedTemporaryFile @@ -433,7 +433,7 @@ def report( " also printed to the screen when using the --full-output-filename option.", ) def aggregate_reports( - filenames: Iterable[str], full_output_filename: str | None, brief: bool = False + filenames: Sequence[str], full_output_filename: str | None, brief: bool = False ) -> None: """Aggregate pipetask report output on disjoint data-id groups into one Summary over common tasks and datasets. Intended for use when the same diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 63d0828a..bb88b4b9 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import pprint -from collections.abc import Iterable, Sequence -from typing import Any +from collections.abc import Sequence from astropy.table import Table from lsst.daf.butler import Butler @@ -196,18 +195,34 @@ def report_v2( def aggregate_reports( - filenames: Iterable[str], full_output_filename: str | None, brief: bool = False + filenames: Sequence[str], full_output_filename: str | None, brief: bool = False ) -> None: - """Docstring. + """Aggregrate multiple `QuantumProvenanceGraph` summaries on separate + dataquery-identified groups into one wholistic report. This is intended for + reports over the same tasks in the same pipeline, after `pipetask report` + has been resolved over all graphs associated with each group. - open a bunch of json files, call model_validate_json, call aggregrate, - print summary + Parameters + ---------- + filenames : `Sequence[str]` + The paths to the JSON files produced by `pipetask report` (note: this + is only compatible with the multi-graph or `--force-v2` option). These + files correspond to the `QuantumProvenanceGraph.Summary` objects which + are produced for each group. + full_output_filename : `str | None` + The name of the JSON file in which to store the aggregate report, if + passed. This is passed to `print_summary` at the end of this function. + brief : `bool = False` + Only display short (counts-only) summary on stdout. This includes + counts and not error messages or data_ids (similar to BPS report). + This option will still report all `cursed` datasets and `wonky` + quanta. This is passed to `print_summary` at the end of this function. """ - summaries: Iterable[Summary] = [] + summaries: list[Summary] = [] for filename in filenames: with open(filename) as f: model = Summary.model_validate_json(f.read()) - summaries.append(model) + summaries.extend([model]) result = Summary.aggregate(summaries) print_summary(result, full_output_filename, brief) diff --git a/requirements.txt b/requirements.txt index 2debf254..4623a213 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,6 @@ networkx lsst-resources @ git+https://github.com/lsst/resources@main lsst-daf-butler @ git+https://github.com/lsst/daf_butler@main lsst-utils @ git+https://github.com/lsst/utils@main -lsst-pipe-base @ git+https://github.com/lsst/pipe_base@tickets/DM-41605 +lsst-pipe-base @ git+https://github.com/lsst/pipe_base@main lsst-pex-config @ git+https://github.com/lsst/pex_config@main sqlalchemy diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index 1c69a35c..947753cc 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -231,7 +231,7 @@ def test_report(self): for task_summary in model.tasks.values(): self.assertEqual(task_summary.n_successful, 0) self.assertEqual(task_summary.n_blocked, 0) - self.assertEqual(task_summary.n_not_attempted, 1) + self.assertEqual(task_summary.n_unknown, 1) self.assertEqual(task_summary.n_expected, 1) self.assertListEqual(task_summary.failed_quanta, []) self.assertListEqual(task_summary.recovered_quanta, []) @@ -243,8 +243,8 @@ def test_report(self): dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}], ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 1) self.assertEqual(dataset_type_summary.n_cursed, 0) @@ -327,7 +327,7 @@ def test_aggregate_reports(self): for task_label, task_summary in agg_sum.tasks.items(): self.assertEqual(task_summary.n_successful, 0) self.assertEqual(task_summary.n_blocked, 0) - self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_unknown, 2) self.assertEqual(task_summary.n_expected, 2) self.assertListEqual(task_summary.failed_quanta, []) self.assertListEqual(task_summary.recovered_quanta, []) @@ -339,8 +339,8 @@ def test_aggregate_reports(self): dataset_type_summary.unsuccessful_datasets, [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], ) - self.assertEqual(dataset_type_summary.n_published, 0) - self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_visible, 0) + self.assertEqual(dataset_type_summary.n_shadowed, 0) self.assertEqual(dataset_type_summary.n_predicted_only, 0) self.assertEqual(dataset_type_summary.n_expected, 2) self.assertEqual(dataset_type_summary.n_cursed, 0)