From 735294ace853bc9022a07e69d470640555946b85 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Tue, 13 Aug 2024 15:38:53 -0700 Subject: [PATCH] Tests for pipetask aggregate-reports --- python/lsst/ctrl/mpexec/cli/cmd/__init__.py | 13 +- python/lsst/ctrl/mpexec/cli/cmd/commands.py | 9 +- .../lsst/ctrl/mpexec/cli/script/__init__.py | 2 +- python/lsst/ctrl/mpexec/cli/script/report.py | 12 +- requirements.txt | 2 +- tests/test_cliCmdReport.py | 440 ++++++------------ 6 files changed, 180 insertions(+), 298 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cli/cmd/__init__.py b/python/lsst/ctrl/mpexec/cli/cmd/__init__.py index 5330afbe..d90812a9 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/__init__.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/__init__.py @@ -39,4 +39,15 @@ ] -from .commands import aggregate_reports, build, cleanup, pre_exec_init_qbb, purge, qgraph, report, run, run_qbb, update_graph_run +from .commands import ( + aggregate_reports, + build, + cleanup, + pre_exec_init_qbb, + purge, + qgraph, + report, + run, + run_qbb, + update_graph_run, +) diff --git a/python/lsst/ctrl/mpexec/cli/cmd/commands.py b/python/lsst/ctrl/mpexec/cli/cmd/commands.py index 367c358f..a2461268 100644 --- a/python/lsst/ctrl/mpexec/cli/cmd/commands.py +++ b/python/lsst/ctrl/mpexec/cli/cmd/commands.py @@ -26,7 +26,7 @@ # along with this program. If not, see . import sys -from collections.abc import Iterator, Sequence, Iterable +from collections.abc import Iterable, Iterator, Sequence from contextlib import contextmanager from functools import partial from tempfile import NamedTemporaryFile @@ -417,6 +417,7 @@ def report( assert len(qgraphs) == 1, "Cannot make a report without a quantum graph." script.report(repo, qgraphs[0], full_output_filename, logs, brief) + @click.command(cls=PipetaskCommand) @click.argument("filenames", nargs=-1) @click.option( @@ -432,9 +433,7 @@ def report( " also printed to the screen when using the --full-output-filename option.", ) def aggregate_reports( - filenames: Iterable[str], - full_output_filename: str | None, - brief: bool = False + filenames: Iterable[str], full_output_filename: str | None, brief: bool = False ) -> None: """Aggregate pipetask report output on disjoint data-id groups into one Summary over common tasks and datasets. Intended for use when the same @@ -442,7 +441,7 @@ def aggregate_reports( for a given step). This functionality is only compatible with reports from the `QuantumProvenanceGraph`, so the reports must be run over multiple groups or with the `--force-v2` option. - + Save the report as a file (`--full-output-filename`) or print it to stdout (default). If the terminal is overwhelmed with data_ids from failures try the `--brief` option. diff --git a/python/lsst/ctrl/mpexec/cli/script/__init__.py b/python/lsst/ctrl/mpexec/cli/script/__init__.py index 399ebeeb..e195c8bd 100644 --- a/python/lsst/ctrl/mpexec/cli/script/__init__.py +++ b/python/lsst/ctrl/mpexec/cli/script/__init__.py @@ -31,7 +31,7 @@ from .pre_exec_init_qbb import pre_exec_init_qbb from .purge import PurgeResult, purge from .qgraph import qgraph -from .report import report, report_v2, aggregate_reports +from .report import aggregate_reports, report, report_v2 from .run import run from .run_qbb import run_qbb from .update_graph_run import update_graph_run diff --git a/python/lsst/ctrl/mpexec/cli/script/report.py b/python/lsst/ctrl/mpexec/cli/script/report.py index 69d1a792..63d0828a 100644 --- a/python/lsst/ctrl/mpexec/cli/script/report.py +++ b/python/lsst/ctrl/mpexec/cli/script/report.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import pprint -import time -from collections.abc import Sequence, Iterable +from collections.abc import Iterable, Sequence from typing import Any from astropy.table import Table @@ -196,12 +195,15 @@ def report_v2( print_summary(summary, full_output_filename, brief) -def aggregate_reports(filenames: Iterable[str], full_output_filename: str | None, brief: bool = False) -> None: +def aggregate_reports( + filenames: Iterable[str], full_output_filename: str | None, brief: bool = False +) -> None: """Docstring. - open a bunch of json files, call model_validate_json, call aggregrate, print summary + open a bunch of json files, call model_validate_json, call aggregrate, + print summary """ - summaries : Iterable[Summary] = [] + summaries: Iterable[Summary] = [] for filename in filenames: with open(filename) as f: model = Summary.model_validate_json(f.read()) diff --git a/requirements.txt b/requirements.txt index 4623a213..2debf254 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,6 @@ networkx lsst-resources @ git+https://github.com/lsst/resources@main lsst-daf-butler @ git+https://github.com/lsst/daf_butler@main lsst-utils @ git+https://github.com/lsst/utils@main -lsst-pipe-base @ git+https://github.com/lsst/pipe_base@main +lsst-pipe-base @ git+https://github.com/lsst/pipe_base@tickets/DM-41605 lsst-pex-config @ git+https://github.com/lsst/pex_config@main sqlalchemy diff --git a/tests/test_cliCmdReport.py b/tests/test_cliCmdReport.py index d3b8bb6b..1c69a35c 100644 --- a/tests/test_cliCmdReport.py +++ b/tests/test_cliCmdReport.py @@ -32,15 +32,39 @@ import yaml from lsst.ctrl.mpexec.cli.pipetask import cli as pipetask_cli +from lsst.ctrl.mpexec.cli.script.report import print_summary from lsst.daf.butler.cli.utils import LogCliRunner, clickResultMsg from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir -from lsst.pipe.base.quantum_provenance_graph import DatasetTypeSummary, Summary, TaskSummary +from lsst.pipe.base.quantum_provenance_graph import Summary from lsst.pipe.base.tests.simpleQGraph import makeSimpleQGraph from lsst.pipe.base.tests.util import check_output_run from yaml.loader import SafeLoader TESTDIR = os.path.abspath(os.path.dirname(__file__)) +expected_mock_datasets = [ + "add_dataset1", + "add2_dataset1", + "task0_metadata", + "task0_log", + "add_dataset2", + "add2_dataset2", + "task1_metadata", + "task1_log", + "add_dataset3", + "add2_dataset3", + "task2_metadata", + "task2_log", + "add_dataset4", + "add2_dataset4", + "task3_metadata", + "task3_log", + "add_dataset5", + "add2_dataset5", + "task4_metadata", + "task4_log", +] + class ReportTest(unittest.TestCase): """Test executing "pipetask report" command.""" @@ -203,291 +227,137 @@ def test_report(self): with open(test_filename_v2) as f: output = f.read() model = Summary.model_validate_json(output) - self.assertDictEqual( - model.tasks, - { - "task0": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - "task1": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - "task2": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - "task3": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - "task4": TaskSummary( - n_successful=0, - n_blocked=0, - n_unknown=1, - n_expected=1, - failed_quanta=[], - recovered_quanta=[], - wonky_quanta=[], - n_wonky=0, - n_failed=0, - ), - }, - ) - self.assertDictEqual( - model.datasets, - { - "add_dataset1": DatasetTypeSummary( - producer="task0", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset1": DatasetTypeSummary( - producer="task0", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task0_metadata": DatasetTypeSummary( - producer="task0", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task0_log": DatasetTypeSummary( - producer="task0", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add_dataset2": DatasetTypeSummary( - producer="task1", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset2": DatasetTypeSummary( - producer="task1", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task1_metadata": DatasetTypeSummary( - producer="task1", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task1_log": DatasetTypeSummary( - producer="task1", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add_dataset3": DatasetTypeSummary( - producer="task2", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset3": DatasetTypeSummary( - producer="task2", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task2_metadata": DatasetTypeSummary( - producer="task2", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task2_log": DatasetTypeSummary( - producer="task2", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add_dataset4": DatasetTypeSummary( - producer="task3", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset4": DatasetTypeSummary( - producer="task3", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task3_metadata": DatasetTypeSummary( - producer="task3", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task3_log": DatasetTypeSummary( - producer="task3", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add_dataset5": DatasetTypeSummary( - producer="task4", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "add2_dataset5": DatasetTypeSummary( - producer="task4", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task4_metadata": DatasetTypeSummary( - producer="task4", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - "task4_log": DatasetTypeSummary( - producer="task4", - n_visible=0, - n_shadowed=0, - n_predicted_only=0, - n_expected=1, - cursed_datasets=[], - unsuccessful_datasets=[{"instrument": "INSTR", "detector": 0}], - n_cursed=0, - n_unsuccessful=1, - ), - }, + # Below is the same set of tests as in `pipe_base`: + for task_summary in model.tasks.values(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_not_attempted, 1) + self.assertEqual(task_summary.n_expected, 1) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in model.datasets.items(): + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}], + ) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 1) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 1) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") + + def test_aggregate_reports(self): + """Test `pipetask aggregate-reports` command. We make one + `SimpleQgraph` and then fake a copy in a couple of different ways, + making sure we can aggregate the similar graphs. + """ + metadata = {"output_run": "run1"} + butler, qgraph1 = makeSimpleQGraph( + run="run", + root=self.root, + metadata=metadata, + ) + + # Check that we can get the proper run collection from the qgraph + self.assertEqual(check_output_run(qgraph1, "run"), []) + + # Save the graph + graph_uri_1 = os.path.join(self.root, "graph1.qgraph") + qgraph1.saveUri(graph_uri_1) + + file1 = os.path.join(self.root, "report_test_1.json") + file2 = os.path.join(self.root, "report_test_2.json") + aggregate_file = os.path.join(self.root, "aggregate_report.json") + + report1 = self.runner.invoke( + pipetask_cli, + [ + "report", + self.root, + graph_uri_1, + "--no-logs", + "--full-output-filename", + file1, + "--force-v2", + ], + input="no", + ) + + self.assertEqual(report1.exit_code, 0, clickResultMsg(report1)) + # Now, copy the json output into a duplicate file and aggregate + with open(file1, "r") as f: + sum1 = Summary.model_validate_json(f.read()) + sum2 = sum1.model_copy(deep=True) + print_summary(sum2, file2, brief=False) + + # Then use these file outputs as the inputs to aggregate reports: + aggregate_report = self.runner.invoke( + pipetask_cli, + [ + "aggregate-reports", + file1, + file2, + "--full-output-filename", + aggregate_file, + ], ) + # Check that aggregate command had a zero exit code: + self.assertEqual(aggregate_report.exit_code, 0, clickResultMsg(aggregate_report)) + + # Check that it aggregates as expected: + with open(aggregate_file) as f: + agg_sum = Summary.model_validate_json(f.read()) + for task_label, task_summary in agg_sum.tasks.items(): + self.assertEqual(task_summary.n_successful, 0) + self.assertEqual(task_summary.n_blocked, 0) + self.assertEqual(task_summary.n_not_attempted, 2) + self.assertEqual(task_summary.n_expected, 2) + self.assertListEqual(task_summary.failed_quanta, []) + self.assertListEqual(task_summary.recovered_quanta, []) + self.assertListEqual(task_summary.wonky_quanta, []) + self.assertEqual(task_summary.n_wonky, 0) + self.assertEqual(task_summary.n_failed, 0) + for dataset_type_name, dataset_type_summary in agg_sum.datasets.items(): + self.assertListEqual( + dataset_type_summary.unsuccessful_datasets, + [{"instrument": "INSTR", "detector": 0}, {"instrument": "INSTR", "detector": 0}], + ) + self.assertEqual(dataset_type_summary.n_published, 0) + self.assertEqual(dataset_type_summary.n_unpublished, 0) + self.assertEqual(dataset_type_summary.n_predicted_only, 0) + self.assertEqual(dataset_type_summary.n_expected, 2) + self.assertEqual(dataset_type_summary.n_cursed, 0) + self.assertEqual(dataset_type_summary.n_unsuccessful, 2) + self.assertListEqual(dataset_type_summary.cursed_datasets, []) + self.assertIn(dataset_type_name, expected_mock_datasets) + match dataset_type_name: + case name if name in ["add_dataset1", "add2_dataset1", "task0_metadata", "task0_log"]: + self.assertEqual(dataset_type_summary.producer, "task0") + case name if name in ["add_dataset2", "add2_dataset2", "task1_metadata", "task1_log"]: + self.assertEqual(dataset_type_summary.producer, "task1") + case name if name in ["add_dataset3", "add2_dataset3", "task2_metadata", "task2_log"]: + self.assertEqual(dataset_type_summary.producer, "task2") + case name if name in ["add_dataset4", "add2_dataset4", "task3_metadata", "task3_log"]: + self.assertEqual(dataset_type_summary.producer, "task3") + case name if name in ["add_dataset5", "add2_dataset5", "task4_metadata", "task4_log"]: + self.assertEqual(dataset_type_summary.producer, "task4") if __name__ == "__main__":