diff --git a/e2e_test/batch/explain.slt b/e2e_test/batch/explain.slt index 1f9cebdf1b68b..f050c6cea5133 100644 --- a/e2e_test/batch/explain.slt +++ b/e2e_test/batch/explain.slt @@ -7,6 +7,18 @@ explain create index i on t(v); statement ok explain create sink sink_t from t with ( connector = 'kafka', type = 'append-only' ) +# statement ok +# set batch_parallelism=1; + +# query I +# explain (distsql, format dot) SELECT approx_percentile(0.5) WITHIN GROUP (order by v) from t; +# ---- +# digraph { +# 0 [ label = "Stage 0: QueryStage { id: 0, parallelism: Some(1), exchange_info: Some(ExchangeInfo { mode: Single, distribution: None }), has_table_scan: false }" ] +# 1 [ label = "Stage 1: QueryStage { id: 1, parallelism: Some(4), exchange_info: Some(ExchangeInfo { mode: Single, distribution: None }), has_table_scan: true }" ] +# 0 -> 1 [ label = "" ] +# } + statement ok drop table t; diff --git a/src/frontend/planner_test/tests/testdata/input/explain_dot_format.yaml b/src/frontend/planner_test/tests/testdata/input/explain_dot_format.yaml index 377f395b73b8d..ce0134d364d70 100644 --- a/src/frontend/planner_test/tests/testdata/input/explain_dot_format.yaml +++ b/src/frontend/planner_test/tests/testdata/input/explain_dot_format.yaml @@ -40,5 +40,11 @@ COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col17, COALESCE((SELECT b FROM t2 WHERE t1.a = t2.c), 0) col18 from t1; + expected_outputs: + - explain_output +- name: test dot output format (distsql, stream) + sql: | + CREATE TABLE t (v1 int); + explain (distsql, format dot) create materialized view m1 as SELECT approx_percentile(0.5) WITHIN GROUP (order by v1) from t; expected_outputs: - explain_output \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/output/explain_dot_format.yaml b/src/frontend/planner_test/tests/testdata/output/explain_dot_format.yaml index 2fce651976e82..6d53415a6d2d2 100644 --- a/src/frontend/planner_test/tests/testdata/output/explain_dot_format.yaml +++ b/src/frontend/planner_test/tests/testdata/output/explain_dot_format.yaml @@ -437,3 +437,32 @@ 176 -> 182 [ ] 182 -> 183 [ ] } +- name: test dot output format (distsql, stream) + sql: | + CREATE TABLE t (v1 int); + explain (distsql, format dot) create materialized view m1 as SELECT approx_percentile(0.5) WITHIN GROUP (order by v1) from t; + explain_output: | + digraph { + 0 [ label = "Fragment 0\l" ] + 1 [ label = "StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck }\ltables: [\"Materialize: 4294967294\"]\l" ] + 2 [ label = "StreamGlobalApproxPercentile { quantile: 0.5:Float64, relative_error: 0.01:Float64 }\ltables: [\"GlobalApproxPercentileBucketState: 0\",\"GlobalApproxPercentileCountState: 1\"]\l" ] + 3 [ label = "StreamExchange Single from 1" ] + 4 [ label = "Fragment 1\l" ] + 5 [ label = "StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Float64, relative_error: 0.01:Float64 }" ] + 6 [ label = "StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] }" ] + 7 [ label = "StreamTableScan { table: t, columns: [v1, _row_id] }\ltables: [\"StreamScan: 2\"]\l" ] + 8 [ label = "Upstream" ] + 9 [ label = "BatchPlanNode" ] + 10 [ label = "Table 0\lcolumns: [\"sign\",\"bucket_id\",\"count\",\"_rw_timestamp\"]\lprimary key: [\"$0 ASC\",\"$1 ASC\"]\lvalue indices: [\"0\",\"1\",\"2\"]\ldistribution key: []\lread pk prefix len hint: \"0\"\l" ] + 11 [ label = "Table 1\lcolumns: [\"total_count\",\"_rw_timestamp\"]\lprimary key: []\lvalue indices: [\"0\"]\ldistribution key: []\lread pk prefix len hint: \"0\"\l" ] + 12 [ label = "Table 2\lcolumns: [\"vnode\",\"_row_id\",\"backfill_finished\",\"row_count\",\"_rw_timestamp\"]\lprimary key: [\"$0 ASC\"]\lvalue indices: [\"1\",\"2\",\"3\"]\ldistribution key: [\"0\"]\lread pk prefix len hint: \"1\"\lvnode column idx: \"0\"\l" ] + 13 [ label = "Table 4294967294\lcolumns: [\"approx_percentile\",\"_rw_timestamp\"]\lprimary key: []\lvalue indices: [\"0\"]\ldistribution key: []\lread pk prefix len hint: \"0\"\l" ] + 0 -> 1 [ label = "" ] + 1 -> 2 [ label = "" ] + 2 -> 3 [ label = "" ] + 4 -> 5 [ label = "" ] + 5 -> 6 [ label = "" ] + 6 -> 7 [ label = "" ] + 7 -> 8 [ label = "" ] + 7 -> 9 [ label = "" ] + } diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 66003051a70b9..ffcb170373dd7 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use petgraph::dot::Dot; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::bail_not_implemented; @@ -35,7 +36,7 @@ use crate::optimizer::plan_node::{Convention, Explain}; use crate::optimizer::OptimizerContext; use crate::scheduler::BatchPlanFragmenter; use crate::stream_fragmenter::build_graph; -use crate::utils::explain_stream_graph; +use crate::utils::{explain_stream_graph, explain_stream_graph_as_dot}; use crate::OptimizerContextRef; async fn do_handle_explain( @@ -46,6 +47,8 @@ async fn do_handle_explain( ) -> Result<()> { // Workaround to avoid `Rc` across `await` point. let mut batch_plan_fragmenter = None; + let mut batch_plan_fragmenter_fmt = ExplainFormat::Json; + let session = handler_args.session.clone(); { @@ -218,10 +221,19 @@ async fn do_handle_explain( session.config().batch_parallelism().0, plan.clone(), )?); + batch_plan_fragmenter_fmt = if explain_format == ExplainFormat::Dot { + ExplainFormat::Dot + } else { + ExplainFormat::Json + } } Convention::Stream => { let graph = build_graph(plan.clone())?; - blocks.push(explain_stream_graph(&graph, explain_verbose)); + if explain_format == ExplainFormat::Dot { + blocks.push(explain_stream_graph_as_dot(&graph, explain_verbose)) + } else { + blocks.push(explain_stream_graph(&graph, explain_verbose)); + } } } } @@ -255,8 +267,14 @@ async fn do_handle_explain( if let Some(fragmenter) = batch_plan_fragmenter { let query = fragmenter.generate_complete_query().await?; - let stage_graph_json = serde_json::to_string_pretty(&query.stage_graph).unwrap(); - blocks.push(stage_graph_json); + let stage_graph = if batch_plan_fragmenter_fmt == ExplainFormat::Dot { + let graph = query.stage_graph.to_petgraph(); + let dot = Dot::new(&graph); + dot.to_string() + } else { + serde_json::to_string_pretty(&query.stage_graph).unwrap() + }; + blocks.push(stage_graph); } Ok(()) diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 165f867b3c76e..5e7e76500e9a0 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -39,7 +39,7 @@ use fixedbitset::FixedBitSet; use itertools::Itertools; use paste::paste; use petgraph::dot::{Config, Dot}; -use petgraph::graph::{Graph, NodeIndex}; +use petgraph::graph::Graph; use pretty_xmlish::{Pretty, PrettyConfig}; use risingwave_common::catalog::Schema; use risingwave_common::util::recursive::{self, Recurse}; @@ -56,7 +56,7 @@ use super::property::{Distribution, FunctionalDependencySet, MonotonicityMap, Or use crate::error::{ErrorCode, Result}; use crate::optimizer::ExpressionSimplifyRewriter; use crate::session::current::notice_to_user; -use crate::utils::PrettySerde; +use crate::utils::{build_graph_from_pretty, PrettySerde}; /// A marker trait for different conventions, used for enforcing type safety. /// @@ -736,45 +736,6 @@ impl Explain for PlanRef { } } -fn build_graph_from_pretty( - pretty: &Pretty<'_>, - graph: &mut Graph, - nodes: &mut HashMap, - parent_label: Option<&str>, -) { - if let Pretty::Record(r) = pretty { - let mut label = String::new(); - label.push_str(&r.name); - for (k, v) in &r.fields { - label.push('\n'); - label.push_str(k); - label.push_str(": "); - label.push_str( - &serde_json::to_string(&PrettySerde(v.clone(), false)) - .expect("failed to serialize plan to dot"), - ); - } - // output alignment. - if !r.fields.is_empty() { - label.push('\n'); - } - - let current_node = *nodes - .entry(label.clone()) - .or_insert_with(|| graph.add_node(label.clone())); - - if let Some(parent_label) = parent_label { - if let Some(&parent_node) = nodes.get(parent_label) { - graph.add_edge(parent_node, current_node, "contains".to_string()); - } - } - - for child in &r.children { - build_graph_from_pretty(child, graph, nodes, Some(&label)); - } - } -} - pub(crate) fn pretty_config() -> PrettyConfig { PrettyConfig { indent: 3, diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 9cec27601a246..6a4682367ff0b 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -24,6 +24,7 @@ use enum_as_inner::EnumAsInner; use futures::TryStreamExt; use iceberg::expr::Predicate as IcebergPredicate; use itertools::Itertools; +use petgraph::{Directed, Graph}; use pgwire::pg_server::SessionId; use risingwave_batch::error::BatchError; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; @@ -839,6 +840,34 @@ impl StageGraph { Ok(()) } + + /// Converts the `StageGraph` into a `petgraph::graph::Graph`. + pub fn to_petgraph(&self) -> Graph { + let mut graph = Graph::::new(); + + let mut node_indices = HashMap::new(); + + // Add all stages as nodes + for (&stage_id, stage_ref) in self.stages.iter().sorted_by_key(|(id, _)| **id) { + let node_label = format!("Stage {}: {:?}", stage_id, stage_ref); + let node_index = graph.add_node(node_label); + node_indices.insert(stage_id, node_index); + } + + // Add edges between stages based on child_edges + for (&parent_id, children) in &self.child_edges { + if let Some(&parent_index) = node_indices.get(&parent_id) { + for &child_id in children { + if let Some(&child_index) = node_indices.get(&child_id) { + // Add an edge from parent to child + graph.add_edge(parent_index, child_index, "".to_string()); + } + } + } + } + + graph + } } struct StageGraphBuilder { diff --git a/src/frontend/src/utils/stream_graph_formatter.rs b/src/frontend/src/utils/stream_graph_formatter.rs index effe7e69a3c05..888f66d7889c9 100644 --- a/src/frontend/src/utils/stream_graph_formatter.rs +++ b/src/frontend/src/utils/stream_graph_formatter.rs @@ -16,12 +16,16 @@ use std::cmp::max; use std::collections::{BTreeMap, HashMap}; use itertools::Itertools; +use petgraph::dot::Dot; +use petgraph::graph::NodeIndex; +use petgraph::Graph; use pretty_xmlish::{Pretty, PrettyConfig}; use risingwave_common::util::stream_graph_visitor; use risingwave_pb::catalog::Table; use risingwave_pb::stream_plan::stream_fragment_graph::StreamFragmentEdge; use risingwave_pb::stream_plan::{stream_node, DispatcherType, StreamFragmentGraph, StreamNode}; +use super::PrettySerde; use crate::TableCatalog; /// ice: in the future, we may allow configurable width, boundaries, etc. @@ -36,6 +40,12 @@ pub fn explain_stream_graph(graph: &StreamFragmentGraph, is_verbose: bool) -> St output } +pub fn explain_stream_graph_as_dot(sg: &StreamFragmentGraph, is_verbose: bool) -> String { + let graph = StreamGraphFormatter::new(is_verbose).explain_graph_as_dot(sg); + let dot = Dot::new(&graph); + dot.to_string() +} + /// A formatter to display the final stream plan graph, used for `explain (distsql) create /// materialized view ...` struct StreamGraphFormatter { @@ -88,6 +98,34 @@ impl StreamGraphFormatter { } } + fn explain_graph_as_dot(&mut self, graph: &StreamFragmentGraph) -> Graph { + self.edges.clear(); + for edge in &graph.edges { + self.edges.insert(edge.link_id, edge.clone()); + } + + let mut g = Graph::::new(); + let mut nodes = HashMap::new(); + for (_, fragment) in graph.fragments.iter().sorted_by_key(|(id, _)| **id) { + let mut label = String::new(); + label.push_str("Fragment "); + label.push_str(&fragment.get_fragment_id().to_string()); + label.push('\n'); + nodes.insert(label.clone(), g.add_node(label.clone())); + + build_graph_from_pretty( + &self.explain_node(fragment.node.as_ref().unwrap()), + &mut g, + &mut nodes, + Some(&label), + ); + } + for tb in self.tables.values() { + build_graph_from_pretty(&self.explain_table(tb), &mut g, &mut nodes, None); + } + g + } + fn explain_table<'a>(&self, tb: &Table) -> Pretty<'a> { let tb = TableCatalog::from(tb.clone()); let columns = tb @@ -201,3 +239,42 @@ impl StreamGraphFormatter { Pretty::simple_record(one_line_explain, fields, children) } } + +pub fn build_graph_from_pretty( + pretty: &Pretty<'_>, + graph: &mut Graph, + nodes: &mut HashMap, + parent_label: Option<&str>, +) { + if let Pretty::Record(r) = pretty { + let mut label = String::new(); + label.push_str(&r.name); + for (k, v) in &r.fields { + label.push('\n'); + label.push_str(k); + label.push_str(": "); + label.push_str( + &serde_json::to_string(&PrettySerde(v.clone(), false)) + .expect("failed to serialize plan to dot"), + ); + } + // output alignment. + if !r.fields.is_empty() { + label.push('\n'); + } + + let current_node = *nodes + .entry(label.clone()) + .or_insert_with(|| graph.add_node(label.clone())); + + if let Some(parent_label) = parent_label { + if let Some(&parent_node) = nodes.get(parent_label) { + graph.add_edge(parent_node, current_node, "".to_string()); + } + } + + for child in &r.children { + build_graph_from_pretty(child, graph, nodes, Some(&label)); + } + } +}