From 217fef6adaf4989a7ebcb57fdb53c346ef362037 Mon Sep 17 00:00:00 2001 From: Erfan Nourbakhsh Date: Wed, 18 Dec 2024 15:49:41 -0500 Subject: [PATCH] Initial prototype for visualization of pipeline processing status --- .../visualization/_formatting.py | 11 ++ .../pipeline_graph/visualization/_options.py | 13 +- .../pipeline_graph/visualization/_show.py | 17 ++- .../pipeline_graph/visualization/_status.py | 135 ++++++++++++++++++ 4 files changed, 171 insertions(+), 5 deletions(-) create mode 100644 python/lsst/pipe/base/pipeline_graph/visualization/_status.py diff --git a/python/lsst/pipe/base/pipeline_graph/visualization/_formatting.py b/python/lsst/pipe/base/pipeline_graph/visualization/_formatting.py index df51654c7..ffe525835 100644 --- a/python/lsst/pipe/base/pipeline_graph/visualization/_formatting.py +++ b/python/lsst/pipe/base/pipeline_graph/visualization/_formatting.py @@ -39,6 +39,7 @@ from .._nodes import NodeKey, NodeType from ._merge import MergedNodeKey from ._options import NodeAttributeOptions +from ._status import NodeStatusOptions, TaskStatusInfo, DatasetTypeStatusInfo DisplayNodeKey = NodeKey | MergedNodeKey """Type alias for graph keys that may be original task, task init, or dataset @@ -77,6 +78,10 @@ def get_node_symbol(node: DisplayNodeKey, x: int | None = None) -> str: return "▤" raise ValueError(f"Unexpected node key: {node} of type {type(node)}.") +# The text based one may only ever have the numbers, but we definitely want +# to have the colors for dot and mermaid. +def get_node_color(node: DisplayNodeKey, x: int | None = None) -> str: + pass class GetNodeText: """A callback for the `Printer` class's `get_text` callback that @@ -231,3 +236,9 @@ def format_task_class(options: NodeAttributeOptions, task_class_name: str) -> st case False: return "" raise ValueError(f"Invalid display option for task_classes: {options.task_classes!r}.") + +def format_task_status(options: NodeStatusOptions, task_status: TaskStatusInfo) -> str: + return "" + +def format_dataset_type_status(options: NodeStatusOptions, dataset_type_status: DatasetTypeStatusInfo) -> str: + return "" \ No newline at end of file diff --git a/python/lsst/pipe/base/pipeline_graph/visualization/_options.py b/python/lsst/pipe/base/pipeline_graph/visualization/_options.py index 206a2d14f..4dd2aa4d2 100644 --- a/python/lsst/pipe/base/pipeline_graph/visualization/_options.py +++ b/python/lsst/pipe/base/pipeline_graph/visualization/_options.py @@ -30,7 +30,7 @@ import dataclasses from typing import Literal - +from ._status import NodeStatusOptions @dataclasses.dataclass class NodeAttributeOptions: @@ -69,10 +69,13 @@ class NodeAttributeOptions: - `None`: context-dependent default behavior. """ + status: NodeStatusOptions | None = None + """Options for displaying execution status.""" + def __bool__(self) -> bool: - return bool(self.dimensions or self.storage_classes or self.task_classes) + return bool(self.dimensions or self.storage_classes or self.task_classes or self.status) - def checked(self, is_resolved: bool) -> NodeAttributeOptions: + def checked(self, is_resolved: bool, has_status: bool = False) -> NodeAttributeOptions: """Check these options against a pipeline graph's resolution status and fill in defaults. @@ -81,6 +84,9 @@ def checked(self, is_resolved: bool) -> NodeAttributeOptions: is_resolved : `bool` Whether the pipeline graph to be displayed is resolved (`PipelineGraph.is_fully_resolved`). + has_status : `bool` + Whether the pipeline graph to be displayed has status information. + Defaults to `False`. Returns ------- @@ -106,4 +112,5 @@ def checked(self, is_resolved: bool) -> NodeAttributeOptions: self.task_classes if self.task_classes is not None else ("concise" if is_resolved else False) ), storage_classes=(self.storage_classes if self.storage_classes is not None else is_resolved), + status=self.status if has_status else None, ) diff --git a/python/lsst/pipe/base/pipeline_graph/visualization/_show.py b/python/lsst/pipe/base/pipeline_graph/visualization/_show.py index fb2c001fc..08a0080bc 100644 --- a/python/lsst/pipe/base/pipeline_graph/visualization/_show.py +++ b/python/lsst/pipe/base/pipeline_graph/visualization/_show.py @@ -48,6 +48,7 @@ ) from ._options import NodeAttributeOptions from ._printer import make_default_printer +from ._status import StatusAnnotator, NodeStatusOptions DisplayNodeKey = NodeKey | MergedNodeKey @@ -64,6 +65,8 @@ def parse_display_args( merge_output_trees: int = 4, merge_intermediates: bool = True, include_automatic_connections: bool = False, + status_annotator: StatusAnnotator | None = None, + status_options: NodeStatusOptions | None = None, ) -> tuple[networkx.DiGraph | networkx.MultiDiGraph, NodeAttributeOptions]: """Print a text-based ~.PipelineGraph` visualization. @@ -126,21 +129,31 @@ def parse_display_args( include_automatic_connections : `bool`, optional Whether to include automatically-added connections like the config, log, and metadata dataset types for each task. Default is `False`. + status_annotator : `StatusAnnotator`, optional + Annotator to add status information to the graph. Default is `None`. + status_options : `NodeStatusOptions`, optional + Options for displaying execution status. Default is `None`. """ if init is None: if not dataset_types: raise ValueError("Cannot show init and runtime graphs unless dataset types are shown.") xgraph = pipeline_graph.make_xgraph() + if status_annotator is not None: + raise ValueError("Cannot show status with both init and runtime graphs.") elif dataset_types: xgraph = pipeline_graph.make_bipartite_xgraph(init) + if status_annotator is not None: + status_annotator(xgraph, dataset_types=True) else: xgraph = pipeline_graph.make_task_xgraph(init) storage_classes = False + if status_annotator is not None: + status_annotator(xgraph, dataset_types=False) options = NodeAttributeOptions( - dimensions=dimensions, storage_classes=storage_classes, task_classes=task_classes + dimensions=dimensions, storage_classes=storage_classes, task_classes=task_classes, status=status_options ) - options = options.checked(pipeline_graph.is_fully_resolved) + options = options.checked(pipeline_graph.is_fully_resolved, has_status=status_annotator is not None) if dataset_types and not include_automatic_connections: taskish_nodes: list[TaskNode | TaskInitNode] = [] diff --git a/python/lsst/pipe/base/pipeline_graph/visualization/_status.py b/python/lsst/pipe/base/pipeline_graph/visualization/_status.py new file mode 100644 index 000000000..c0d59c197 --- /dev/null +++ b/python/lsst/pipe/base/pipeline_graph/visualization/_status.py @@ -0,0 +1,135 @@ +# This file is part of pipe_base. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from __future__ import annotations + +__all__ = ("show", "parse_display_args") + +import sys +from collections.abc import Sequence +from shutil import get_terminal_size +from typing import Any, Literal, TextIO + +import networkx + +from .._nodes import NodeKey +from .._pipeline_graph import PipelineGraph +from .._tasks import TaskInitNode, TaskNode +from ._formatting import GetNodeText, get_node_symbol +from ._layout import ColumnSelector, Layout +from ._merge import ( + MergedNodeKey, + merge_graph_input_trees, + merge_graph_intermediates, + merge_graph_output_trees, +) +from ._options import NodeAttributeOptions +from ._printer import make_default_printer +import dataclasses +from typing import Protocol, overload, Literal, TYPE_CHECKING +from .._nodes import NodeKey, NodeType + +if TYPE_CHECKING: + from ... import quantum_provenance_graph as qpg + + +@dataclasses.dataclass +class TaskStatusInfo: + expected: int + succeded: int + failed: int + blocked: int + ready: int | None = None + running: int | None = None + wonky: int | None = None + + +@dataclasses.dataclass +class DatasetTypeStatusInfo: + expected: int + produced: int + + +@dataclasses.dataclass +class NodeStatusOptions: + # Add colors here. + pass + + +class StatusAnnotator(Protocol): + """Annotate a networkx graph of tasks and possibly dataset types with + status information.""" + + @overload + def __call__(self, xgraph: networkx.DiGraph, dataset_types: Literal[False]) -> None: + ... + + @overload + def __call__(self, xgraph: networkx.MultiDiGraph, dataset_types: Literal[True]) -> None: + ... + + def __call__(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, dataset_types: bool) -> None: + ... + + +class QuantumProvenanceGraphStatusAnnotator: + """...""" + + def __init__(self, qpg_summary: qpg.Summary) -> None: + self.qpg_summary = qpg_summary + + @overload + def __call__(self, xgraph: networkx.DiGraph, dataset_types: Literal[False]) -> None: + ... + + @overload + def __call__(self, xgraph: networkx.MultiDiGraph, dataset_types: Literal[True]) -> None: + ... + + def __call__(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, dataset_types: bool) -> None: + for task_label, task_summary in self.qpg_summary.tasks.items(): + task_status_info = TaskStatusInfo( + expected=task_summary.n_expected, + succeded=task_summary.n_successful, + failed=task_summary.n_failed, + blocked=task_summary.n_blocked, + wonky=task_summary.n_wonky, + ) + # Note: `ready` and `running` are for bps! For bps, we want to add + # `pending` to `ready`. + + key = NodeKey(NodeType.TASK, task_label) + xgraph.nodes[key]["status"] = task_status_info + + if dataset_types: + for dataset_type_name, dataset_type_summary in self.qpg_summary.datasets.items(): + dataset_type_status_info = DatasetTypeStatusInfo( + expected=dataset_type_summary.n_expected, + produced=dataset_type_summary.n_visible + dataset_type_summary.n_shadowed, + ) + + key = NodeKey(NodeType.DATASET_TYPE, dataset_type_name) + xgraph.nodes[key]["status"] = dataset_type_status_info \ No newline at end of file