Skip to content

Commit

Permalink
Initial prototype for visualization of pipeline processing status
Browse files Browse the repository at this point in the history
  • Loading branch information
enourbakhsh committed Dec 18, 2024
1 parent 269e412 commit 2668a91
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 5 deletions.
10 changes: 10 additions & 0 deletions python/lsst/pipe/base/pipeline_graph/visualization/_formatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def get_node_symbol(node: DisplayNodeKey, x: int | None = None) -> str:
return "▤"
raise ValueError(f"Unexpected node key: {node} of type {type(node)}.")

# The text based one may only ever have the numbers, but we definitely want
# to have the colors for dot and mermaid.
def get_node_color(node: DisplayNodeKey, x: int | None = None) -> str:
pass

class GetNodeText:
"""A callback for the `Printer` class's `get_text` callback that
Expand Down Expand Up @@ -231,3 +235,9 @@ def format_task_class(options: NodeAttributeOptions, task_class_name: str) -> st
case False:
return ""
raise ValueError(f"Invalid display option for task_classes: {options.task_classes!r}.")

def format_task_status(options: NodeStatusOptions, task_status: TaskStatusInfo) -> str:
pass

def format_dataset_type_status(options: NodeStatusOptions, dataset_type_status: DatasetTypeStatusInfo) -> str:
pass
13 changes: 10 additions & 3 deletions python/lsst/pipe/base/pipeline_graph/visualization/_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import dataclasses
from typing import Literal

from ._status import NodeStatusOptions

@dataclasses.dataclass
class NodeAttributeOptions:
Expand Down Expand Up @@ -69,10 +69,13 @@ class NodeAttributeOptions:
- `None`: context-dependent default behavior.
"""

status: NodeStatusOptions | None = None
"""Options for displaying execution status."""

def __bool__(self) -> bool:
return bool(self.dimensions or self.storage_classes or self.task_classes)
return bool(self.dimensions or self.storage_classes or self.task_classes or self.status)

def checked(self, is_resolved: bool) -> NodeAttributeOptions:
def checked(self, is_resolved: bool, has_status: bool = False) -> NodeAttributeOptions:
"""Check these options against a pipeline graph's resolution status and
fill in defaults.
Expand All @@ -81,6 +84,9 @@ def checked(self, is_resolved: bool) -> NodeAttributeOptions:
is_resolved : `bool`
Whether the pipeline graph to be displayed is resolved
(`PipelineGraph.is_fully_resolved`).
has_status : `bool`
Whether the pipeline graph to be displayed has status information.
Defaults to `False`.
Returns
-------
Expand All @@ -106,4 +112,5 @@ def checked(self, is_resolved: bool) -> NodeAttributeOptions:
self.task_classes if self.task_classes is not None else ("concise" if is_resolved else False)
),
storage_classes=(self.storage_classes if self.storage_classes is not None else is_resolved),
status=self.status if has_status else None,
)
17 changes: 15 additions & 2 deletions python/lsst/pipe/base/pipeline_graph/visualization/_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from ._options import NodeAttributeOptions
from ._printer import make_default_printer
from ._status import StatusAnnotator, NodeStatusOptions

DisplayNodeKey = NodeKey | MergedNodeKey

Expand All @@ -64,6 +65,8 @@ def parse_display_args(
merge_output_trees: int = 4,
merge_intermediates: bool = True,
include_automatic_connections: bool = False,
status_annotator: StatusAnnotator | None = None,
status_options: NodeStatusOptions | None = None,
) -> tuple[networkx.DiGraph | networkx.MultiDiGraph, NodeAttributeOptions]:
"""Print a text-based ~.PipelineGraph` visualization.
Expand Down Expand Up @@ -126,21 +129,31 @@ def parse_display_args(
include_automatic_connections : `bool`, optional
Whether to include automatically-added connections like the config,
log, and metadata dataset types for each task. Default is `False`.
status_annotator : `StatusAnnotator`, optional
Annotator to add status information to the graph. Default is `None`.
status_options : `NodeStatusOptions`, optional
Options for displaying execution status. Default is `None`.
"""
if init is None:
if not dataset_types:
raise ValueError("Cannot show init and runtime graphs unless dataset types are shown.")
xgraph = pipeline_graph.make_xgraph()
if status_annotator is not None:
raise ValueError("Cannot show status with both init and runtime graphs.")
elif dataset_types:
xgraph = pipeline_graph.make_bipartite_xgraph(init)
if status_annotator is not None:
status_annotator(xgraph, dataset_types=True)
else:
xgraph = pipeline_graph.make_task_xgraph(init)
storage_classes = False
if status_annotator is not None:
status_annotator(xgraph, dataset_types=False)

options = NodeAttributeOptions(
dimensions=dimensions, storage_classes=storage_classes, task_classes=task_classes
dimensions=dimensions, storage_classes=storage_classes, task_classes=task_classes, status=status_options
)
options = options.checked(pipeline_graph.is_fully_resolved)
options = options.checked(pipeline_graph.is_fully_resolved, has_status=status_annotator is not None)

if dataset_types and not include_automatic_connections:
taskish_nodes: list[TaskNode | TaskInitNode] = []
Expand Down
135 changes: 135 additions & 0 deletions python/lsst/pipe/base/pipeline_graph/visualization/_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# This file is part of pipe_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = ("show", "parse_display_args")

import sys
from collections.abc import Sequence
from shutil import get_terminal_size
from typing import Any, Literal, TextIO

import networkx

from .._nodes import NodeKey
from .._pipeline_graph import PipelineGraph
from .._tasks import TaskInitNode, TaskNode
from ._formatting import GetNodeText, get_node_symbol
from ._layout import ColumnSelector, Layout
from ._merge import (
MergedNodeKey,
merge_graph_input_trees,
merge_graph_intermediates,
merge_graph_output_trees,
)
from ._options import NodeAttributeOptions
from ._printer import make_default_printer
import dataclasses
from typing import Protocol, overload, Literal, TYPE_CHECKING
from .._nodes import NodeKey, NodeType

if TYPE_CHECKING:
from ... import quantum_provenance_graph as qpg


@dataclasses.dataclass
class TaskStatusInfo:
expected: int
succeded: int
failed: int
blocked: int
ready: int | None = None
running: int | None = None
wonky: int | None = None


@dataclasses.dataclass
class DatasetTypeStatusInfo:
expected: int
produced: int


@dataclasses.dataclass
class NodeStatusOptions:
# Add colors here.
pass


class StatusAnnotator(Protocol):
"""Annotate a networkx graph of tasks and possibly dataset types with
status information."""

@overload
def __call__(self, xgraph: networkx.DiGraph, dataset_types: Literal[False]) -> None:
...

@overload
def __call__(self, xgraph: networkx.MultiDiGraph, dataset_types: Literal[True]) -> None:
...

def __call__(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, dataset_types: bool) -> None:
...


class QuantumProvenanceGraphStatusAnnotator:
"""..."""

def __init__(self, qpg_summary: qpg.Summary) -> None:
self.qpg_summary = qpg_summary

@overload
def __call__(self, xgraph: networkx.DiGraph, dataset_types: Literal[False]) -> None:
...

@overload
def __call__(self, xgraph: networkx.MultiDiGraph, dataset_types: Literal[True]) -> None:
...

def __call__(self, xgraph: networkx.DiGraph | networkx.MultiDiGraph, dataset_types: bool) -> None:
for task_label, task_summary in self.qpg_summary.tasks.items():
task_status_info = TaskStatusInfo(
expected=task_summary.n_expected,
succeded=task_summary.n_successful,
failed=task_summary.n_failed,
blocked=task_summary.n_blocked,
wonky=task_summary.n_wonky,
)
# Note: `ready` and `running` are for bps! For bps, we want to add
# `pending` to `ready`.

key = NodeKey(NodeType.TASK, task_label)
xgraph.nodes[key]["status"] = task_status_info

if dataset_types:
for dataset_type_name, dataset_type_summary in self.qpg_summary.datasets.items():
dataset_type_status_info = DatasetTypeStatusInfo(
expected=dataset_type_summary.n_expected,
produced=dataset_type_summary.n_visible + dataset_type_summary.n_shadowed,
)

key = NodeKey(NodeType.DATASET_TYPE, dataset_type_name)
xgraph.nodes[key]["status"] = dataset_type_status_info

0 comments on commit 2668a91

Please sign in to comment.