diff --git a/dashboard/README.md b/dashboard/README.md
index 1212aef935684..724d21c964481 100644
--- a/dashboard/README.md
+++ b/dashboard/README.md
@@ -36,7 +36,20 @@ For example:
./risedev slt e2e_test/nexmark/create_sources.slt.part
./risedev psql -c 'CREATE TABLE dimension (v1 int);'
./risedev psql -c 'CREATE MATERIALIZED VIEW mv AS SELECT auction.* FROM dimension join auction on auction.id-auction.id = dimension.v1;'
-./risedev psql -c 'INSERT INTO dimension select 0 from generate_series(1, 50);'
+./risedev psql -c 'CREATE MATERIALIZED VIEW mv2 AS SELECT * FROM mv;'
+./risedev psql -c 'CREATE MATERIALIZED VIEW mv3 AS SELECT count(*) FROM mv2;'
+
+./risedev psql -c 'CREATE MATERIALIZED VIEW mv4 AS SELECT * FROM mv;'
+./risedev psql -c 'CREATE MATERIALIZED VIEW mv5 AS SELECT count(*) FROM mv2;'
+./risedev psql -c 'CREATE MATERIALIZED VIEW mv6 AS SELECT mv4.* FROM mv4 join mv2 using(id);'
+./risedev psql -c 'CREATE MATERIALIZED VIEW mv7 AS SELECT max(id) FROM mv;'
+
+./risedev psql -c 'CREATE MATERIALIZED VIEW mv8 AS SELECT mv.* FROM mv join mv6 using(id);'
+./risedev psql -c 'CREATE SCHEMA s1;'
+./risedev psql -c 'CREATE TABLE s1.t1 (v1 int);'
+./risedev psql -c 'CREATE MATERIALIZED VIEW s1.mv1 AS SELECT s1.t1.* FROM s1.t1 join mv on s1.t1.v1 = mv.id;'
+
+./risedev psql -c 'INSERT INTO dimension select 0 from generate_series(1, 20);'
```
Install dependencies and start the development server.
diff --git a/dashboard/components/FragmentGraph.tsx b/dashboard/components/FragmentGraph.tsx
index 6ca2c14dda28e..1e7d80c8641c3 100644
--- a/dashboard/components/FragmentGraph.tsx
+++ b/dashboard/components/FragmentGraph.tsx
@@ -10,7 +10,6 @@ import {
theme,
useDisclosure,
} from "@chakra-ui/react"
-import { tinycolor } from "@ctrl/tinycolor"
import loadable from "@loadable/component"
import * as d3 from "d3"
import { cloneDeep } from "lodash"
@@ -26,6 +25,7 @@ import {
} from "../lib/layout"
import { PlanNodeDatum } from "../pages/fragment_graph"
import { StreamNode } from "../proto/gen/stream_plan"
+import { backPressureColor, backPressureWidth } from "./utils/backPressure"
const ReactJson = loadable(() => import("react-json-view"))
@@ -396,7 +396,7 @@ export default function FragmentGraph({
if (backPressures) {
let value = backPressures.get(`${d.target}_${d.source}`)
if (value) {
- return backPressureWidth(value)
+ return backPressureWidth(value, 30)
}
}
@@ -482,44 +482,3 @@ export default function FragmentGraph({
)
}
-
-/**
- * The color for the edge with given back pressure value.
- *
- * @param value The back pressure rate, between 0 and 100.
- */
-function backPressureColor(value: number) {
- const colorRange = [
- theme.colors.green["100"],
- theme.colors.green["300"],
- theme.colors.yellow["400"],
- theme.colors.orange["500"],
- theme.colors.red["700"],
- ].map((c) => tinycolor(c))
-
- value = Math.max(value, 0)
- value = Math.min(value, 100)
-
- const step = colorRange.length - 1
- const pos = (value / 100) * step
- const floor = Math.floor(pos)
- const ceil = Math.ceil(pos)
-
- const color = tinycolor(colorRange[floor])
- .mix(tinycolor(colorRange[ceil]), (pos - floor) * 100)
- .toHexString()
-
- return color
-}
-
-/**
- * The width for the edge with given back pressure value.
- *
- * @param value The back pressure rate, between 0 and 100.
- */
-function backPressureWidth(value: number) {
- value = Math.max(value, 0)
- value = Math.min(value, 100)
-
- return 30 * (value / 100) + 2
-}
diff --git a/dashboard/components/Layout.tsx b/dashboard/components/Layout.tsx
index 6daa7e821ce3c..f16270adb2f4f 100644
--- a/dashboard/components/Layout.tsx
+++ b/dashboard/components/Layout.tsx
@@ -144,7 +144,7 @@ function Layout({ children }: { children: React.ReactNode }) {
Streaming
- Dependency Graph
+ Relation Graph
Fragment Graph
diff --git a/dashboard/components/RelationDependencyGraph.tsx b/dashboard/components/RelationDependencyGraph.tsx
index d2f5052dc368d..1ab2fc7839a81 100644
--- a/dashboard/components/RelationDependencyGraph.tsx
+++ b/dashboard/components/RelationDependencyGraph.tsx
@@ -25,6 +25,7 @@ import {
relationTypeTitleCase,
} from "../lib/api/streaming"
import {
+ Edge,
Enter,
Position,
RelationPoint,
@@ -33,6 +34,7 @@ import {
generateRelationEdges,
} from "../lib/layout"
import { CatalogModal, useCatalogModal } from "./CatalogModal"
+import { backPressureColor, backPressureWidth } from "./utils/backPressure"
function boundBox(
relationPosition: RelationPointPosition[],
@@ -59,10 +61,12 @@ export default function RelationDependencyGraph({
nodes,
selectedId,
setSelectedId,
+ backPressures,
}: {
nodes: RelationPoint[]
selectedId: string | undefined
setSelectedId: (id: string) => void
+ backPressures?: Map // relationId-relationId->back_pressure_rate})
}) {
const [modalData, setModalId] = useCatalogModal(nodes.map((n) => n.relation))
@@ -114,22 +118,59 @@ export default function RelationDependencyGraph({
const isSelected = (id: string) => id === selectedId
- const applyEdge = (sel: EdgeSelection) =>
+ const applyEdge = (sel: EdgeSelection) => {
+ const color = (d: Edge) => {
+ if (backPressures) {
+ let value = backPressures.get(`${d.target}_${d.source}`)
+ if (value) {
+ return backPressureColor(value)
+ }
+ }
+
+ return theme.colors.gray["300"]
+ }
+
+ const width = (d: Edge) => {
+ if (backPressures) {
+ let value = backPressures.get(`${d.target}_${d.source}`)
+ if (value) {
+ return backPressureWidth(value, 15)
+ }
+ }
+
+ return 2
+ }
+
sel
.attr("d", ({ points }) => line(points))
.attr("fill", "none")
- .attr("stroke-width", 1)
- .attr("stroke-width", (d) =>
- isSelected(d.source) || isSelected(d.target) ? 4 : 2
- )
+ .attr("stroke-width", width)
+ .attr("stroke", color)
.attr("opacity", (d) =>
isSelected(d.source) || isSelected(d.target) ? 1 : 0.5
)
- .attr("stroke", (d) =>
- isSelected(d.source) || isSelected(d.target)
- ? theme.colors.blue["500"]
- : theme.colors.gray["300"]
- )
+
+ // Tooltip for back pressure rate
+ let title = sel.select("title")
+ if (title.empty()) {
+ title = sel.append("title")
+ }
+
+ const text = (d: Edge) => {
+ if (backPressures) {
+ let value = backPressures.get(`${d.target}_${d.source}`)
+ if (value) {
+ return `${value.toFixed(2)}%`
+ }
+ }
+
+ return ""
+ }
+
+ title.text(text)
+
+ return sel
+ }
const createEdge = (sel: Enter) =>
sel.append("path").attr("class", "edge").call(applyEdge)
@@ -224,7 +265,7 @@ export default function RelationDependencyGraph({
nodeSelection.enter().call(createNode)
nodeSelection.call(applyNode)
nodeSelection.exit().remove()
- }, [layoutMap, links, selectedId, setModalId, setSelectedId])
+ }, [layoutMap, links, selectedId, setModalId, setSelectedId, backPressures])
return (
<>
diff --git a/dashboard/components/utils/backPressure.tsx b/dashboard/components/utils/backPressure.tsx
new file mode 100644
index 0000000000000..afb26e0746da4
--- /dev/null
+++ b/dashboard/components/utils/backPressure.tsx
@@ -0,0 +1,43 @@
+import { theme } from "@chakra-ui/react"
+import { tinycolor } from "@ctrl/tinycolor"
+
+/**
+ * The color for the edge with given back pressure value.
+ *
+ * @param value The back pressure rate, between 0 and 100.
+ */
+export function backPressureColor(value: number) {
+ const colorRange = [
+ theme.colors.green["100"],
+ theme.colors.green["300"],
+ theme.colors.yellow["400"],
+ theme.colors.orange["500"],
+ theme.colors.red["700"],
+ ].map((c) => tinycolor(c))
+
+ value = Math.max(value, 0)
+ value = Math.min(value, 100)
+
+ const step = colorRange.length - 1
+ const pos = (value / 100) * step
+ const floor = Math.floor(pos)
+ const ceil = Math.ceil(pos)
+
+ const color = tinycolor(colorRange[floor])
+ .mix(tinycolor(colorRange[ceil]), (pos - floor) * 100)
+ .toHexString()
+
+ return color
+}
+
+/**
+ * The width for the edge with given back pressure value.
+ *
+ * @param value The back pressure rate, between 0 and 100.
+ */
+export function backPressureWidth(value: number, scale: number) {
+ value = Math.max(value, 0)
+ value = Math.min(value, 100)
+
+ return scale * (value / 100) + 2
+}
diff --git a/dashboard/lib/api/fetch.ts b/dashboard/lib/api/fetch.ts
index 6cf980202b548..41aad1049a15a 100644
--- a/dashboard/lib/api/fetch.ts
+++ b/dashboard/lib/api/fetch.ts
@@ -33,7 +33,6 @@ export default function useFetch(
const [response, setResponse] = useState()
const toast = useErrorToast()
- // NOTE(eric): Don't put `fetchFn` in the dependency array. It might be a lambda function
useEffect(() => {
const fetchData = async () => {
if (when) {
@@ -53,6 +52,10 @@ export default function useFetch(
const timer = setInterval(fetchData, intervalMs)
return () => clearInterval(timer)
+ // NOTE(eric): Don't put `fetchFn` in the dependency array. Otherwise, it can cause an infinite loop.
+ // This is because `fetchFn` can be recreated every render, then it will trigger a dependency change,
+ // which triggers a re-render, and so on.
+ // eslint-disable-next-line react-hooks/exhaustive-deps
}, [toast, intervalMs, when])
return { response }
diff --git a/dashboard/lib/api/streaming.ts b/dashboard/lib/api/streaming.ts
index 95ad89fdf5c58..211bd1b6bbc4c 100644
--- a/dashboard/lib/api/streaming.ts
+++ b/dashboard/lib/api/streaming.ts
@@ -27,6 +27,7 @@ import {
View,
} from "../../proto/gen/catalog"
import {
+ FragmentVertexToRelationMap,
ListObjectDependenciesResponse_ObjectDependencies as ObjectDependencies,
RelationIdInfos,
TableFragments,
@@ -130,6 +131,13 @@ export async function getRelationDependencies() {
return await getObjectDependencies()
}
+export async function getFragmentVertexToRelationMap() {
+ let res = await api.get("/fragment_vertex_to_relation_id_map")
+ let fragmentVertexToRelationMap: FragmentVertexToRelationMap =
+ FragmentVertexToRelationMap.fromJSON(res)
+ return fragmentVertexToRelationMap
+}
+
async function getTableCatalogsInner(
path: "tables" | "materialized_views" | "indexes" | "internal_tables"
) {
@@ -200,6 +208,7 @@ export async function getSchemas() {
return schemas
}
+// Returns a map of object id to a list of object ids that it depends on
export async function getObjectDependencies() {
let objDependencies: ObjectDependencies[] = (
await api.get("/object_dependencies")
diff --git a/dashboard/lib/layout.ts b/dashboard/lib/layout.ts
index ca96325e9802f..e6549d4b1e979 100644
--- a/dashboard/lib/layout.ts
+++ b/dashboard/lib/layout.ts
@@ -288,8 +288,14 @@ export interface LayoutItemBase {
export type FragmentBox = LayoutItemBase & {
name: string
+ // Upstream Fragment Ids.
externalParentIds: string[]
- fragment?: TableFragments_Fragment
+ fragment: TableFragments_Fragment
+}
+
+export type RelationBox = LayoutItemBase & {
+ relationName: string
+ schemaName: string
}
export type RelationPoint = LayoutItemBase & {
@@ -304,6 +310,7 @@ export interface Position {
export type FragmentBoxPosition = FragmentBox & Position
export type RelationPointPosition = RelationPoint & Position
+export type RelationBoxPosition = RelationBox & Position
export interface Edge {
points: Array
@@ -489,7 +496,7 @@ export function generateFragmentEdges(
// Simply draw a horizontal line here.
// Typically, external parent is only applicable to `StreamScan` fragment,
// and there'll be only one external parent due to `UpstreamShard` distribution
- // and plan node sharing. So there's no overlapping issue.
+ // and plan node sharing. So we won't see multiple horizontal lines overlap each other.
for (const externalParentId of fragment.externalParentIds) {
links.push({
points: [
@@ -509,3 +516,33 @@ export function generateFragmentEdges(
}
return links
}
+
+export function generateRelationBackPressureEdges(
+ layoutMap: RelationBoxPosition[]
+): Edge[] {
+ const links = []
+ const relationMap = new Map()
+ for (const x of layoutMap) {
+ relationMap.set(x.id, x)
+ }
+ for (const relation of layoutMap) {
+ for (const parentId of relation.parentIds) {
+ const parentRelation = relationMap.get(parentId)!
+ links.push({
+ points: [
+ {
+ x: relation.x + relation.width / 2,
+ y: relation.y + relation.height / 2,
+ },
+ {
+ x: parentRelation.x + parentRelation.width / 2,
+ y: parentRelation.y + parentRelation.height / 2,
+ },
+ ],
+ source: relation.id,
+ target: parentId,
+ })
+ }
+ }
+ return links
+}
diff --git a/dashboard/pages/dependency_graph.tsx b/dashboard/pages/dependency_graph.tsx
index 11d349725cfc1..9217a4d05e2ef 100644
--- a/dashboard/pages/dependency_graph.tsx
+++ b/dashboard/pages/dependency_graph.tsx
@@ -15,25 +15,55 @@
*
*/
-import { Box, Button, Flex, Text, VStack } from "@chakra-ui/react"
-import { reverse, sortBy } from "lodash"
+import {
+ Box,
+ Button,
+ Flex,
+ FormControl,
+ FormLabel,
+ Select,
+ Text,
+ VStack,
+} from "@chakra-ui/react"
+import _, { reverse, sortBy } from "lodash"
import Head from "next/head"
import { parseAsInteger, useQueryState } from "nuqs"
-import { Fragment, useCallback } from "react"
+import { Fragment, useCallback, useEffect, useMemo, useState } from "react"
import RelationDependencyGraph, {
nodeRadius,
} from "../components/RelationDependencyGraph"
import Title from "../components/Title"
+import useErrorToast from "../hook/useErrorToast"
import useFetch from "../lib/api/fetch"
+import {
+ calculateBPRate,
+ calculateCumulativeBp,
+ fetchEmbeddedBackPressure,
+ fetchPrometheusBackPressure,
+} from "../lib/api/metric"
import {
Relation,
+ getFragmentVertexToRelationMap,
getRelationDependencies,
getRelations,
relationIsStreamingJob,
} from "../lib/api/streaming"
import { RelationPoint } from "../lib/layout"
+import { BackPressureInfo } from "../proto/gen/monitor_service"
const SIDEBAR_WIDTH = "200px"
+const INTERVAL_MS = 5000
+
+type BackPressureDataSource = "Embedded" | "Prometheus"
+
+// The state of the embedded back pressure metrics.
+// The metrics from previous fetch are stored here to calculate the rate.
+interface EmbeddedBackPressureInfo {
+ previous: BackPressureInfo[]
+ current: BackPressureInfo[]
+ totalBackpressureNs: BackPressureInfo[]
+ totalDurationNs: number
+}
function buildDependencyAsEdges(
list: Relation[],
@@ -67,6 +97,19 @@ export default function StreamingGraph() {
// Since dependentRelations will be deprecated, we need to use getRelationDependencies here to separately obtain the dependency relationship.
const { response: relationDeps } = useFetch(getRelationDependencies)
const [selectedId, setSelectedId] = useQueryState("id", parseAsInteger)
+ const { response: fragmentVertexToRelationMap } = useFetch(
+ getFragmentVertexToRelationMap
+ )
+ const [resetEmbeddedBackPressures, setResetEmbeddedBackPressures] =
+ useState(false)
+
+ const toggleResetEmbeddedBackPressures = () => {
+ setResetEmbeddedBackPressures(
+ (resetEmbeddedBackPressures) => !resetEmbeddedBackPressures
+ )
+ }
+
+ const toast = useErrorToast()
const relationDependencyCallback = useCallback(() => {
if (relationList && relationDeps) {
@@ -78,9 +121,120 @@ export default function StreamingGraph() {
const relationDependency = relationDependencyCallback()
+ const [backPressureDataSource, setBackPressureDataSource] =
+ useState("Embedded")
+
+ // Periodically fetch Prometheus back-pressure from Meta node
+ const { response: prometheusMetrics } = useFetch(
+ fetchPrometheusBackPressure,
+ INTERVAL_MS,
+ backPressureDataSource === "Prometheus"
+ )
+
+ // Periodically fetch embedded back-pressure from Meta node
+ // Didn't call `useFetch()` because the `setState` way is special.
+ const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] =
+ useState()
+ useEffect(() => {
+ if (resetEmbeddedBackPressures) {
+ setEmbeddedBackPressureInfo(undefined)
+ toggleResetEmbeddedBackPressures()
+ }
+ if (backPressureDataSource === "Embedded") {
+ const interval = setInterval(() => {
+ fetchEmbeddedBackPressure().then(
+ (newBP) => {
+ setEmbeddedBackPressureInfo((prev) =>
+ prev
+ ? {
+ previous: prev.current,
+ current: newBP,
+ totalBackpressureNs: calculateCumulativeBp(
+ prev.totalBackpressureNs,
+ prev.current,
+ newBP
+ ),
+ totalDurationNs:
+ prev.totalDurationNs + INTERVAL_MS * 1000 * 1000,
+ }
+ : {
+ previous: newBP, // Use current value to show zero rate, but it's fine
+ current: newBP,
+ totalBackpressureNs: [],
+ totalDurationNs: 0,
+ }
+ )
+ },
+ (e) => {
+ console.error(e)
+ toast(e, "error")
+ }
+ )
+ }, INTERVAL_MS)
+ return () => {
+ clearInterval(interval)
+ }
+ }
+ }, [backPressureDataSource, toast, resetEmbeddedBackPressures])
+
+ // Get relationId-relationId -> backpressure rate map
+ const backPressures: Map | undefined = useMemo(() => {
+ if (!fragmentVertexToRelationMap) {
+ return new Map()
+ }
+ let inMap = fragmentVertexToRelationMap.inMap
+ let outMap = fragmentVertexToRelationMap.outMap
+ if (prometheusMetrics || embeddedBackPressureInfo) {
+ let map = new Map()
+
+ if (backPressureDataSource === "Embedded" && embeddedBackPressureInfo) {
+ const metrics = calculateBPRate(
+ embeddedBackPressureInfo.totalBackpressureNs,
+ embeddedBackPressureInfo.totalDurationNs
+ )
+ for (const m of metrics.outputBufferBlockingDuration) {
+ let output = Number(m.metric.fragmentId)
+ let input = Number(m.metric.downstreamFragmentId)
+ if (outMap[output] && inMap[input]) {
+ output = outMap[output]
+ input = inMap[input]
+ let key = `${output}_${input}`
+ map.set(key, m.sample[0].value)
+ }
+ }
+ } else if (backPressureDataSource === "Prometheus" && prometheusMetrics) {
+ for (const m of prometheusMetrics.outputBufferBlockingDuration) {
+ if (m.sample.length > 0) {
+ // Note: We issue an instant query to Prometheus to get the most recent value.
+ // So there should be only one sample here.
+ //
+ // Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still
+ // possible that an old version of meta service returns a range-query result.
+ // So we take the one with the latest timestamp here.
+ const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100
+ let output = Number(m.metric.fragment_id)
+ let input = Number(m.metric.downstream_fragment_id)
+ if (outMap[output] && inMap[input]) {
+ output = outMap[output]
+ input = inMap[input]
+ let key = `${output}_${input}`
+ map.set(key, value)
+ }
+ }
+ }
+ }
+ return map
+ }
+ }, [
+ backPressureDataSource,
+ prometheusMetrics,
+ embeddedBackPressureInfo,
+ fragmentVertexToRelationMap,
+ ])
+
const retVal = (
- Dependency Graph
+ Relation Graph
-
- Relations
-
+ {/* NOTE(kwannoel): No need to reset prometheus bp, because it is stateless */}
+
+
+
+
+ Back Pressure Data Source
+
+
+
+
+
+ Relations
+
{relationList?.map((r) => {
const match = selectedId === r.id
return (
@@ -122,12 +303,13 @@ export default function StreamingGraph() {
overflowX="scroll"
overflowY="scroll"
>
- Dependency Graph
+ Relation Graph
{relationDependency && (
setSelectedId(parseInt(id))}
+ backPressures={backPressures}
/>
)}
@@ -138,7 +320,7 @@ export default function StreamingGraph() {
return (
- Streaming Graph
+ Relation Graph
{retVal}
diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx
index 42098ff222858..23b4f04119c07 100644
--- a/dashboard/pages/fragment_graph.tsx
+++ b/dashboard/pages/fragment_graph.tsx
@@ -236,6 +236,7 @@ export default function Streaming() {
}
}, [relationId, relationList, setRelationId])
+ // The table fragments of the selected fragment id
const fragmentDependency = fragmentDependencyCallback()?.fragmentDep
const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag
const fragments = fragmentDependencyCallback()?.fragments
diff --git a/proto/meta.proto b/proto/meta.proto
index 46528f37ee7db..fc1c77856095c 100644
--- a/proto/meta.proto
+++ b/proto/meta.proto
@@ -793,6 +793,12 @@ message RelationIdInfos {
map map = 1;
}
+message FragmentVertexToRelationMap {
+ // fragment_id -> relation_id
+ map in_map = 1;
+ map out_map = 2;
+}
+
message ActorCountPerParallelism {
message WorkerActorCount {
uint64 actor_count = 1;
diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs
index cfdf2bea0f322..992b6a4839be7 100644
--- a/src/meta/src/dashboard/mod.rs
+++ b/src/meta/src/dashboard/mod.rs
@@ -63,12 +63,14 @@ pub(super) mod handlers {
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies;
use risingwave_pb::meta::{
- ActorIds, FragmentIdToActorIdMap, PbTableFragments, RelationIdInfos,
+ ActorIds, FragmentIdToActorIdMap, FragmentVertexToRelationMap, PbTableFragments,
+ RelationIdInfos,
};
use risingwave_pb::monitor_service::{
GetBackPressureResponse, HeapProfilingResponse, ListHeapProfilingResponse,
StackTraceResponse,
};
+ use risingwave_pb::stream_plan::FragmentTypeFlag;
use risingwave_pb::user::PbUserInfo;
use serde_json::json;
use thiserror_ext::AsReport;
@@ -217,6 +219,67 @@ pub(super) mod handlers {
Ok(Json(table_fragments))
}
+ /// In the ddl backpressure graph, we want to compute the backpressure between relations.
+ /// So we need to know which are the fragments which are connected to external relations.
+ /// These fragments form the vertices of the graph.
+ /// We can get collection of backpressure values, keyed by vertex_id-vertex_id.
+ /// This function will return a map of fragment vertex id to relation id.
+ /// We can convert `vertex_id-vertex_id` to `relation_id-relation_id` using that.
+ /// Finally, we have a map of `relation_id-relation_id` to backpressure values.
+ pub async fn get_fragment_vertex_to_relation_id_map(
+ Extension(srv): Extension,
+ ) -> Result> {
+ let map = match &srv.metadata_manager {
+ MetadataManager::V1(mgr) => {
+ let core = mgr.fragment_manager.get_fragment_read_guard().await;
+ let table_fragments = core.table_fragments();
+ let mut in_map = HashMap::new();
+ let mut out_map = HashMap::new();
+ for (relation_id, tf) in table_fragments {
+ for (fragment_id, fragment) in &tf.fragments {
+ if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0
+ || (fragment.fragment_type_mask
+ & FragmentTypeFlag::SnapshotBackfillStreamScan as u32)
+ != 0
+ {
+ in_map.insert(*fragment_id, relation_id.table_id);
+ }
+ if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
+ out_map.insert(*fragment_id, relation_id.table_id);
+ }
+ }
+ }
+ FragmentVertexToRelationMap { in_map, out_map }
+ }
+ MetadataManager::V2(mgr) => {
+ let table_fragments = mgr
+ .catalog_controller
+ .table_fragments()
+ .await
+ .map_err(err)?;
+ let mut in_map = HashMap::new();
+ let mut out_map = HashMap::new();
+ for (relation_id, tf) in table_fragments {
+ for (fragment_id, fragment) in &tf.fragments {
+ if (fragment.fragment_type_mask & FragmentTypeFlag::StreamScan as u32) != 0
+ || (fragment.fragment_type_mask
+ & FragmentTypeFlag::SnapshotBackfillStreamScan as u32)
+ != 0
+ {
+ in_map.insert(*fragment_id, relation_id as u32);
+ }
+ if (fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0 {
+ out_map.insert(*fragment_id, relation_id as u32);
+ }
+ }
+ }
+ FragmentVertexToRelationMap { in_map, out_map }
+ }
+ };
+ Ok(Json(map))
+ }
+
+ /// Provides a hierarchy of relation ids to fragments to actors.
pub async fn get_relation_id_infos(
Extension(srv): Extension,
) -> Result> {
@@ -473,6 +536,11 @@ pub(super) mod handlers {
Ok(srv.diagnose_command.report().await)
}
+ /// NOTE(kwannoel): Although we fetch the BP for the entire graph via this API,
+ /// the workload should be reasonable.
+ /// In most cases, we can safely assume each node has most 2 outgoing edges (e.g. join).
+ /// In such a scenario, the number of edges is linear to the number of nodes.
+ /// So the workload is proportional to the relation id graph we fetch in `get_relation_id_infos`.
pub async fn get_embedded_back_pressures(
Extension(srv): Extension,
) -> Result> {
@@ -522,6 +590,10 @@ impl DashboardService {
.route("/fragments2", get(list_fragments))
.route("/fragments/job_id/:job_id", get(list_fragments_by_job_id))
.route("/relation_id_infos", get(get_relation_id_infos))
+ .route(
+ "/fragment_vertex_to_relation_id_map",
+ get(get_fragment_vertex_to_relation_id_map),
+ )
.route("/views", get(list_views))
.route("/materialized_views", get(list_materialized_views))
.route("/tables", get(list_tables))