From 74b190eb25d0c2470da15da102167ed4eeda685e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 20 Sep 2024 16:15:12 +0800 Subject: [PATCH] feat(dashboard): add backpressure to the relation dependency graph (#18280) (#18611) Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Co-authored-by: Eric Fu --- dashboard/README.md | 15 +- dashboard/components/FragmentGraph.tsx | 45 +--- dashboard/components/Layout.tsx | 2 +- .../components/RelationDependencyGraph.tsx | 63 +++++- dashboard/components/utils/backPressure.tsx | 43 ++++ dashboard/lib/api/fetch.ts | 5 +- dashboard/lib/api/streaming.ts | 9 + dashboard/lib/layout.ts | 41 +++- dashboard/pages/dependency_graph.tsx | 200 +++++++++++++++++- dashboard/pages/fragment_graph.tsx | 1 + proto/meta.proto | 6 + src/meta/src/dashboard/mod.rs | 74 ++++++- 12 files changed, 435 insertions(+), 69 deletions(-) create mode 100644 dashboard/components/utils/backPressure.tsx diff --git a/dashboard/README.md b/dashboard/README.md index 1212aef935684..724d21c964481 100644 --- a/dashboard/README.md +++ b/dashboard/README.md @@ -36,7 +36,20 @@ For example: ./risedev slt e2e_test/nexmark/create_sources.slt.part ./risedev psql -c 'CREATE TABLE dimension (v1 int);' ./risedev psql -c 'CREATE MATERIALIZED VIEW mv AS SELECT auction.* FROM dimension join auction on auction.id-auction.id = dimension.v1;' -./risedev psql -c 'INSERT INTO dimension select 0 from generate_series(1, 50);' +./risedev psql -c 'CREATE MATERIALIZED VIEW mv2 AS SELECT * FROM mv;' +./risedev psql -c 'CREATE MATERIALIZED VIEW mv3 AS SELECT count(*) FROM mv2;' + +./risedev psql -c 'CREATE MATERIALIZED VIEW mv4 AS SELECT * FROM mv;' +./risedev psql -c 'CREATE MATERIALIZED VIEW mv5 AS SELECT count(*) FROM mv2;' +./risedev psql -c 'CREATE MATERIALIZED VIEW mv6 AS SELECT mv4.* FROM mv4 join mv2 using(id);' +./risedev psql -c 'CREATE MATERIALIZED VIEW mv7 AS SELECT max(id) FROM mv;' + +./risedev psql -c 'CREATE MATERIALIZED VIEW mv8 AS SELECT mv.* FROM mv join mv6 using(id);' +./risedev psql -c 'CREATE SCHEMA s1;' +./risedev psql -c 'CREATE TABLE s1.t1 (v1 int);' +./risedev psql -c 'CREATE MATERIALIZED VIEW s1.mv1 AS SELECT s1.t1.* FROM s1.t1 join mv on s1.t1.v1 = mv.id;' + +./risedev psql -c 'INSERT INTO dimension select 0 from generate_series(1, 20);' ``` Install dependencies and start the development server. diff --git a/dashboard/components/FragmentGraph.tsx b/dashboard/components/FragmentGraph.tsx index 6ca2c14dda28e..1e7d80c8641c3 100644 --- a/dashboard/components/FragmentGraph.tsx +++ b/dashboard/components/FragmentGraph.tsx @@ -10,7 +10,6 @@ import { theme, useDisclosure, } from "@chakra-ui/react" -import { tinycolor } from "@ctrl/tinycolor" import loadable from "@loadable/component" import * as d3 from "d3" import { cloneDeep } from "lodash" @@ -26,6 +25,7 @@ import { } from "../lib/layout" import { PlanNodeDatum } from "../pages/fragment_graph" import { StreamNode } from "../proto/gen/stream_plan" +import { backPressureColor, backPressureWidth } from "./utils/backPressure" const ReactJson = loadable(() => import("react-json-view")) @@ -396,7 +396,7 @@ export default function FragmentGraph({ if (backPressures) { let value = backPressures.get(`${d.target}_${d.source}`) if (value) { - return backPressureWidth(value) + return backPressureWidth(value, 30) } } @@ -482,44 +482,3 @@ export default function FragmentGraph({ ) } - -/** - * The color for the edge with given back pressure value. - * - * @param value The back pressure rate, between 0 and 100. - */ -function backPressureColor(value: number) { - const colorRange = [ - theme.colors.green["100"], - theme.colors.green["300"], - theme.colors.yellow["400"], - theme.colors.orange["500"], - theme.colors.red["700"], - ].map((c) => tinycolor(c)) - - value = Math.max(value, 0) - value = Math.min(value, 100) - - const step = colorRange.length - 1 - const pos = (value / 100) * step - const floor = Math.floor(pos) - const ceil = Math.ceil(pos) - - const color = tinycolor(colorRange[floor]) - .mix(tinycolor(colorRange[ceil]), (pos - floor) * 100) - .toHexString() - - return color -} - -/** - * The width for the edge with given back pressure value. - * - * @param value The back pressure rate, between 0 and 100. - */ -function backPressureWidth(value: number) { - value = Math.max(value, 0) - value = Math.min(value, 100) - - return 30 * (value / 100) + 2 -} diff --git a/dashboard/components/Layout.tsx b/dashboard/components/Layout.tsx index 6daa7e821ce3c..f16270adb2f4f 100644 --- a/dashboard/components/Layout.tsx +++ b/dashboard/components/Layout.tsx @@ -144,7 +144,7 @@ function Layout({ children }: { children: React.ReactNode }) {
Streaming - Dependency Graph + Relation Graph Fragment Graph
diff --git a/dashboard/components/RelationDependencyGraph.tsx b/dashboard/components/RelationDependencyGraph.tsx index d2f5052dc368d..1ab2fc7839a81 100644 --- a/dashboard/components/RelationDependencyGraph.tsx +++ b/dashboard/components/RelationDependencyGraph.tsx @@ -25,6 +25,7 @@ import { relationTypeTitleCase, } from "../lib/api/streaming" import { + Edge, Enter, Position, RelationPoint, @@ -33,6 +34,7 @@ import { generateRelationEdges, } from "../lib/layout" import { CatalogModal, useCatalogModal } from "./CatalogModal" +import { backPressureColor, backPressureWidth } from "./utils/backPressure" function boundBox( relationPosition: RelationPointPosition[], @@ -59,10 +61,12 @@ export default function RelationDependencyGraph({ nodes, selectedId, setSelectedId, + backPressures, }: { nodes: RelationPoint[] selectedId: string | undefined setSelectedId: (id: string) => void + backPressures?: Map // relationId-relationId->back_pressure_rate}) }) { const [modalData, setModalId] = useCatalogModal(nodes.map((n) => n.relation)) @@ -114,22 +118,59 @@ export default function RelationDependencyGraph({ const isSelected = (id: string) => id === selectedId - const applyEdge = (sel: EdgeSelection) => + const applyEdge = (sel: EdgeSelection) => { + const color = (d: Edge) => { + if (backPressures) { + let value = backPressures.get(`${d.target}_${d.source}`) + if (value) { + return backPressureColor(value) + } + } + + return theme.colors.gray["300"] + } + + const width = (d: Edge) => { + if (backPressures) { + let value = backPressures.get(`${d.target}_${d.source}`) + if (value) { + return backPressureWidth(value, 15) + } + } + + return 2 + } + sel .attr("d", ({ points }) => line(points)) .attr("fill", "none") - .attr("stroke-width", 1) - .attr("stroke-width", (d) => - isSelected(d.source) || isSelected(d.target) ? 4 : 2 - ) + .attr("stroke-width", width) + .attr("stroke", color) .attr("opacity", (d) => isSelected(d.source) || isSelected(d.target) ? 1 : 0.5 ) - .attr("stroke", (d) => - isSelected(d.source) || isSelected(d.target) - ? theme.colors.blue["500"] - : theme.colors.gray["300"] - ) + + // Tooltip for back pressure rate + let title = sel.select("title") + if (title.empty()) { + title = sel.append("title") + } + + const text = (d: Edge) => { + if (backPressures) { + let value = backPressures.get(`${d.target}_${d.source}`) + if (value) { + return `${value.toFixed(2)}%` + } + } + + return "" + } + + title.text(text) + + return sel + } const createEdge = (sel: Enter) => sel.append("path").attr("class", "edge").call(applyEdge) @@ -224,7 +265,7 @@ export default function RelationDependencyGraph({ nodeSelection.enter().call(createNode) nodeSelection.call(applyNode) nodeSelection.exit().remove() - }, [layoutMap, links, selectedId, setModalId, setSelectedId]) + }, [layoutMap, links, selectedId, setModalId, setSelectedId, backPressures]) return ( <> diff --git a/dashboard/components/utils/backPressure.tsx b/dashboard/components/utils/backPressure.tsx new file mode 100644 index 0000000000000..afb26e0746da4 --- /dev/null +++ b/dashboard/components/utils/backPressure.tsx @@ -0,0 +1,43 @@ +import { theme } from "@chakra-ui/react" +import { tinycolor } from "@ctrl/tinycolor" + +/** + * The color for the edge with given back pressure value. + * + * @param value The back pressure rate, between 0 and 100. + */ +export function backPressureColor(value: number) { + const colorRange = [ + theme.colors.green["100"], + theme.colors.green["300"], + theme.colors.yellow["400"], + theme.colors.orange["500"], + theme.colors.red["700"], + ].map((c) => tinycolor(c)) + + value = Math.max(value, 0) + value = Math.min(value, 100) + + const step = colorRange.length - 1 + const pos = (value / 100) * step + const floor = Math.floor(pos) + const ceil = Math.ceil(pos) + + const color = tinycolor(colorRange[floor]) + .mix(tinycolor(colorRange[ceil]), (pos - floor) * 100) + .toHexString() + + return color +} + +/** + * The width for the edge with given back pressure value. + * + * @param value The back pressure rate, between 0 and 100. + */ +export function backPressureWidth(value: number, scale: number) { + value = Math.max(value, 0) + value = Math.min(value, 100) + + return scale * (value / 100) + 2 +} diff --git a/dashboard/lib/api/fetch.ts b/dashboard/lib/api/fetch.ts index 6cf980202b548..41aad1049a15a 100644 --- a/dashboard/lib/api/fetch.ts +++ b/dashboard/lib/api/fetch.ts @@ -33,7 +33,6 @@ export default function useFetch( const [response, setResponse] = useState() const toast = useErrorToast() - // NOTE(eric): Don't put `fetchFn` in the dependency array. It might be a lambda function useEffect(() => { const fetchData = async () => { if (when) { @@ -53,6 +52,10 @@ export default function useFetch( const timer = setInterval(fetchData, intervalMs) return () => clearInterval(timer) + // NOTE(eric): Don't put `fetchFn` in the dependency array. Otherwise, it can cause an infinite loop. + // This is because `fetchFn` can be recreated every render, then it will trigger a dependency change, + // which triggers a re-render, and so on. + // eslint-disable-next-line react-hooks/exhaustive-deps }, [toast, intervalMs, when]) return { response } diff --git a/dashboard/lib/api/streaming.ts b/dashboard/lib/api/streaming.ts index 95ad89fdf5c58..211bd1b6bbc4c 100644 --- a/dashboard/lib/api/streaming.ts +++ b/dashboard/lib/api/streaming.ts @@ -27,6 +27,7 @@ import { View, } from "../../proto/gen/catalog" import { + FragmentVertexToRelationMap, ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies, RelationIdInfos, TableFragments, @@ -130,6 +131,13 @@ export async function getRelationDependencies() { return await getObjectDependencies() } +export async function getFragmentVertexToRelationMap() { + let res = await api.get("/fragment_vertex_to_relation_id_map") + let fragmentVertexToRelationMap: FragmentVertexToRelationMap = + FragmentVertexToRelationMap.fromJSON(res) + return fragmentVertexToRelationMap +} + async function getTableCatalogsInner( path: "tables" | "materialized_views" | "indexes" | "internal_tables" ) { @@ -200,6 +208,7 @@ export async function getSchemas() { return schemas } +// Returns a map of object id to a list of object ids that it depends on export async function getObjectDependencies() { let objDependencies: ObjectDependencies[] = ( await api.get("/object_dependencies") diff --git a/dashboard/lib/layout.ts b/dashboard/lib/layout.ts index ca96325e9802f..e6549d4b1e979 100644 --- a/dashboard/lib/layout.ts +++ b/dashboard/lib/layout.ts @@ -288,8 +288,14 @@ export interface LayoutItemBase { export type FragmentBox = LayoutItemBase & { name: string + // Upstream Fragment Ids. externalParentIds: string[] - fragment?: TableFragments_Fragment + fragment: TableFragments_Fragment +} + +export type RelationBox = LayoutItemBase & { + relationName: string + schemaName: string } export type RelationPoint = LayoutItemBase & { @@ -304,6 +310,7 @@ export interface Position { export type FragmentBoxPosition = FragmentBox & Position export type RelationPointPosition = RelationPoint & Position +export type RelationBoxPosition = RelationBox & Position export interface Edge { points: Array @@ -489,7 +496,7 @@ export function generateFragmentEdges( // Simply draw a horizontal line here. // Typically, external parent is only applicable to `StreamScan` fragment, // and there'll be only one external parent due to `UpstreamShard` distribution - // and plan node sharing. So there's no overlapping issue. + // and plan node sharing. So we won't see multiple horizontal lines overlap each other. for (const externalParentId of fragment.externalParentIds) { links.push({ points: [ @@ -509,3 +516,33 @@ export function generateFragmentEdges( } return links } + +export function generateRelationBackPressureEdges( + layoutMap: RelationBoxPosition[] +): Edge[] { + const links = [] + const relationMap = new Map() + for (const x of layoutMap) { + relationMap.set(x.id, x) + } + for (const relation of layoutMap) { + for (const parentId of relation.parentIds) { + const parentRelation = relationMap.get(parentId)! + links.push({ + points: [ + { + x: relation.x + relation.width / 2, + y: relation.y + relation.height / 2, + }, + { + x: parentRelation.x + parentRelation.width / 2, + y: parentRelation.y + parentRelation.height / 2, + }, + ], + source: relation.id, + target: parentId, + }) + } + } + return links +} diff --git a/dashboard/pages/dependency_graph.tsx b/dashboard/pages/dependency_graph.tsx index 11d349725cfc1..9217a4d05e2ef 100644 --- a/dashboard/pages/dependency_graph.tsx +++ b/dashboard/pages/dependency_graph.tsx @@ -15,25 +15,55 @@ * */ -import { Box, Button, Flex, Text, VStack } from "@chakra-ui/react" -import { reverse, sortBy } from "lodash" +import { + Box, + Button, + Flex, + FormControl, + FormLabel, + Select, + Text, + VStack, +} from "@chakra-ui/react" +import _, { reverse, sortBy } from "lodash" import Head from "next/head" import { parseAsInteger, useQueryState } from "nuqs" -import { Fragment, useCallback } from "react" +import { Fragment, useCallback, useEffect, useMemo, useState } from "react" import RelationDependencyGraph, { nodeRadius, } from "../components/RelationDependencyGraph" import Title from "../components/Title" +import useErrorToast from "../hook/useErrorToast" import useFetch from "../lib/api/fetch" +import { + calculateBPRate, + calculateCumulativeBp, + fetchEmbeddedBackPressure, + fetchPrometheusBackPressure, +} from "../lib/api/metric" import { Relation, + getFragmentVertexToRelationMap, getRelationDependencies, getRelations, relationIsStreamingJob, } from "../lib/api/streaming" import { RelationPoint } from "../lib/layout" +import { BackPressureInfo } from "../proto/gen/monitor_service" const SIDEBAR_WIDTH = "200px" +const INTERVAL_MS = 5000 + +type BackPressureDataSource = "Embedded" | "Prometheus" + +// The state of the embedded back pressure metrics. +// The metrics from previous fetch are stored here to calculate the rate. +interface EmbeddedBackPressureInfo { + previous: BackPressureInfo[] + current: BackPressureInfo[] + totalBackpressureNs: BackPressureInfo[] + totalDurationNs: number +} function buildDependencyAsEdges( list: Relation[], @@ -67,6 +97,19 @@ export default function StreamingGraph() { // Since dependentRelations will be deprecated, we need to use getRelationDependencies here to separately obtain the dependency relationship. const { response: relationDeps } = useFetch(getRelationDependencies) const [selectedId, setSelectedId] = useQueryState("id", parseAsInteger) + const { response: fragmentVertexToRelationMap } = useFetch( + getFragmentVertexToRelationMap + ) + const [resetEmbeddedBackPressures, setResetEmbeddedBackPressures] = + useState(false) + + const toggleResetEmbeddedBackPressures = () => { + setResetEmbeddedBackPressures( + (resetEmbeddedBackPressures) => !resetEmbeddedBackPressures + ) + } + + const toast = useErrorToast() const relationDependencyCallback = useCallback(() => { if (relationList && relationDeps) { @@ -78,9 +121,120 @@ export default function StreamingGraph() { const relationDependency = relationDependencyCallback() + const [backPressureDataSource, setBackPressureDataSource] = + useState("Embedded") + + // Periodically fetch Prometheus back-pressure from Meta node + const { response: prometheusMetrics } = useFetch( + fetchPrometheusBackPressure, + INTERVAL_MS, + backPressureDataSource === "Prometheus" + ) + + // Periodically fetch embedded back-pressure from Meta node + // Didn't call `useFetch()` because the `setState` way is special. + const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] = + useState() + useEffect(() => { + if (resetEmbeddedBackPressures) { + setEmbeddedBackPressureInfo(undefined) + toggleResetEmbeddedBackPressures() + } + if (backPressureDataSource === "Embedded") { + const interval = setInterval(() => { + fetchEmbeddedBackPressure().then( + (newBP) => { + setEmbeddedBackPressureInfo((prev) => + prev + ? { + previous: prev.current, + current: newBP, + totalBackpressureNs: calculateCumulativeBp( + prev.totalBackpressureNs, + prev.current, + newBP + ), + totalDurationNs: + prev.totalDurationNs + INTERVAL_MS * 1000 * 1000, + } + : { + previous: newBP, // Use current value to show zero rate, but it's fine + current: newBP, + totalBackpressureNs: [], + totalDurationNs: 0, + } + ) + }, + (e) => { + console.error(e) + toast(e, "error") + } + ) + }, INTERVAL_MS) + return () => { + clearInterval(interval) + } + } + }, [backPressureDataSource, toast, resetEmbeddedBackPressures]) + + // Get relationId-relationId -> backpressure rate map + const backPressures: Map | undefined = useMemo(() => { + if (!fragmentVertexToRelationMap) { + return new Map() + } + let inMap = fragmentVertexToRelationMap.inMap + let outMap = fragmentVertexToRelationMap.outMap + if (prometheusMetrics || embeddedBackPressureInfo) { + let map = new Map() + + if (backPressureDataSource === "Embedded" && embeddedBackPressureInfo) { + const metrics = calculateBPRate( + embeddedBackPressureInfo.totalBackpressureNs, + embeddedBackPressureInfo.totalDurationNs + ) + for (const m of metrics.outputBufferBlockingDuration) { + let output = Number(m.metric.fragmentId) + let input = Number(m.metric.downstreamFragmentId) + if (outMap[output] && inMap[input]) { + output = outMap[output] + input = inMap[input] + let key = `${output}_${input}` + map.set(key, m.sample[0].value) + } + } + } else if (backPressureDataSource === "Prometheus" && prometheusMetrics) { + for (const m of prometheusMetrics.outputBufferBlockingDuration) { + if (m.sample.length > 0) { + // Note: We issue an instant query to Prometheus to get the most recent value. + // So there should be only one sample here. + // + // Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still + // possible that an old version of meta service returns a range-query result. + // So we take the one with the latest timestamp here. + const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100 + let output = Number(m.metric.fragment_id) + let input = Number(m.metric.downstream_fragment_id) + if (outMap[output] && inMap[input]) { + output = outMap[output] + input = inMap[input] + let key = `${output}_${input}` + map.set(key, value) + } + } + } + } + return map + } + }, [ + backPressureDataSource, + prometheusMetrics, + embeddedBackPressureInfo, + fragmentVertexToRelationMap, + ]) + const retVal = ( - Dependency Graph + Relation Graph - - Relations - + {/* NOTE(kwannoel): No need to reset prometheus bp, because it is stateless */} + + + + + Back Pressure Data Source + + + + + + Relations + {relationList?.map((r) => { const match = selectedId === r.id return ( @@ -122,12 +303,13 @@ export default function StreamingGraph() { overflowX="scroll" overflowY="scroll" > - Dependency Graph + Relation Graph {relationDependency && ( setSelectedId(parseInt(id))} + backPressures={backPressures} /> )} @@ -138,7 +320,7 @@ export default function StreamingGraph() { return ( - Streaming Graph + Relation Graph {retVal} diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 42098ff222858..23b4f04119c07 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -236,6 +236,7 @@ export default function Streaming() { } }, [relationId, relationList, setRelationId]) + // The table fragments of the selected fragment id const fragmentDependency = fragmentDependencyCallback()?.fragmentDep const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag const fragments = fragmentDependencyCallback()?.fragments diff --git a/proto/meta.proto b/proto/meta.proto index 46528f37ee7db..fc1c77856095c 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -793,6 +793,12 @@ message RelationIdInfos { map map = 1; } +message FragmentVertexToRelationMap { + // fragment_id -> relation_id + map in_map = 1; + map out_map = 2; +} + message ActorCountPerParallelism { message WorkerActorCount { uint64 actor_count = 1; diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index cfdf2bea0f322..992b6a4839be7 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -63,12 +63,14 @@ pub(super) mod handlers { use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; use risingwave_pb::meta::{ - ActorIds, FragmentIdToActorIdMap, PbTableFragments, RelationIdInfos, + ActorIds, FragmentIdToActorIdMap, FragmentVertexToRelationMap, PbTableFragments, + RelationIdInfos, }; use risingwave_pb::monitor_service::{ GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse, }; + use risingwave_pb::stream_plan::FragmentTypeFlag; use risingwave_pb::user::PbUserInfo; use serde_json::json; use thiserror_ext::AsReport; @@ -217,6 +219,67 @@ pub(super) mod handlers { Ok(Json(table_fragments)) } + /// In the ddl backpressure graph, we want to compute the backpressure between relations. + /// So we need to know which are the fragments which are connected to external relations. + /// These fragments form the vertices of the graph. + /// We can get collection of backpressure values, keyed by vertex_id-vertex_id. + /// This function will return a map of fragment vertex id to relation id. + /// We can convert `vertex_id-vertex_id` to `relation_id-relation_id` using that. + /// Finally, we have a map of `relation_id-relation_id` to backpressure values. + pub async fn get_fragment_vertex_to_relation_id_map( + Extension(srv): Extension, + ) -> Result> { + let map = match &srv.metadata_manager { + MetadataManager::V1(mgr) => { + let core = mgr.fragment_manager.get_fragment_read_guard().await; + let table_fragments = core.table_fragments(); + let mut in_map = HashMap::new(); + let mut out_map = HashMap::new(); + for (relation_id, tf) in table_fragments { + for (fragment_id, fragment) in &tf.fragments { + if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 + || (fragment.fragment_type_mask + & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) + != 0 + { + in_map.insert(*fragment_id, relation_id.table_id); + } + if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 { + out_map.insert(*fragment_id, relation_id.table_id); + } + } + } + FragmentVertexToRelationMap { in_map, out_map } + } + MetadataManager::V2(mgr) => { + let table_fragments = mgr + .catalog_controller + .table_fragments() + .await + .map_err(err)?; + let mut in_map = HashMap::new(); + let mut out_map = HashMap::new(); + for (relation_id, tf) in table_fragments { + for (fragment_id, fragment) in &tf.fragments { + if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0 + || (fragment.fragment_type_mask + & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) + != 0 + { + in_map.insert(*fragment_id, relation_id as u32); + } + if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 { + out_map.insert(*fragment_id, relation_id as u32); + } + } + } + FragmentVertexToRelationMap { in_map, out_map } + } + }; + Ok(Json(map)) + } + + /// Provides a hierarchy of relation ids to fragments to actors. pub async fn get_relation_id_infos( Extension(srv): Extension, ) -> Result> { @@ -473,6 +536,11 @@ pub(super) mod handlers { Ok(srv.diagnose_command.report().await) } + /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API, + /// the workload should be reasonable. + /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join). + /// In such a scenario, the number of edges is linear to the number of nodes. + /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`. pub async fn get_embedded_back_pressures( Extension(srv): Extension, ) -> Result> { @@ -522,6 +590,10 @@ impl DashboardService { .route("/fragments2", get(list_fragments)) .route("/fragments/job_id/:job_id", get(list_fragments_by_job_id)) .route("/relation_id_infos", get(get_relation_id_infos)) + .route( + "/fragment_vertex_to_relation_id_map", + get(get_fragment_vertex_to_relation_id_map), + ) .route("/views", get(list_views)) .route("/materialized_views", get(list_materialized_views)) .route("/tables", get(list_tables))