diff --git a/dashboard/components/RateBar.tsx b/dashboard/components/RateBar.tsx
deleted file mode 100644
index 4c4c12cc32363..0000000000000
--- a/dashboard/components/RateBar.tsx
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2025 RisingWave Labs
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import { Box, Text, Tooltip } from "@chakra-ui/react"
-import { tinycolor } from "@ctrl/tinycolor"
-import { p50, p90, p95, p99 } from "../lib/api/metric"
-import { MetricsSample } from "./metrics"
-
-export default function RateBar({ samples }: { samples: MetricsSample[] }) {
- const p_50 = (p50(samples) * 100).toFixed(6)
- const p_95 = (p95(samples) * 100).toFixed(6)
- const p_99 = (p99(samples) * 100).toFixed(6)
- const p_90 = p90(samples) * 100
-
- const bgWidth = Math.ceil(p_90).toFixed(6) + "%"
- const detailRate = `p50: ${p_50}% p95: ${p_95}% p99: ${p_99}%`
-
- // calculate gradient color
- const colorRange = ["#C6F6D5", "#C53030"]
- const endColor = tinycolor(colorRange[0])
- .mix(tinycolor(colorRange[1]), Math.ceil(p_90))
- .toHexString()
- const bgGradient = `linear(to-r, ${colorRange[0]}, ${endColor})`
-
- return (
-
-
- p90: {p_90.toFixed(6)}%
-
-
- )
-}
diff --git a/dashboard/lib/api/metric.ts b/dashboard/lib/api/metric.ts
deleted file mode 100644
index 92172d9163a55..0000000000000
--- a/dashboard/lib/api/metric.ts
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Copyright 2025 RisingWave Labs
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-import { MetricsSample } from "../../components/metrics"
-import {
- BackPressureInfo,
- GetBackPressureResponse,
-} from "../../proto/gen/monitor_service"
-import api from "./api"
-
-export interface BackPressureRateInfo {
- actorId: number
- fragmentId: number
- downstreamFragmentId: number
- backPressureRate: number
-}
-
-function convertToMapAndAgg(
- backPressures: BackPressureInfo[]
-): Map {
- // FragmentId-downstreamFragmentId, total value
- const mapValue = new Map()
- // FragmentId-downstreamFragmentId, total count
- const mapNumber = new Map()
- // FragmentId-downstreamFragmentId, average value
- const map = new Map()
- for (const item of backPressures) {
- const key = `${item.fragmentId}-${item.downstreamFragmentId}`
- mapValue.set(key, (mapValue.get(key) || 0) + item.value)
- mapNumber.set(key, (mapNumber.get(key) || 0) + item.actorCount)
- }
-
- for (const [key, value] of mapValue) {
- map.set(key, value / mapNumber.get(key)!)
- }
- return map
-}
-
-function convertFromMapAndAgg(
- map: Map
-): BackPressureRateInfo[] {
- const result: BackPressureRateInfo[] = []
- map.forEach((value, key) => {
- const [fragmentId, downstreamFragmentId] = key.split("-").map(Number)
- const backPressureRateInfo: BackPressureRateInfo = {
- actorId: 0,
- fragmentId,
- downstreamFragmentId,
- backPressureRate: value,
- }
- result.push(backPressureRateInfo)
- })
- return result
-}
-
-function convertToBackPressureMetrics(
- bpRates: BackPressureRateInfo[]
-): BackPressuresMetrics {
- const bpMetrics: BackPressuresMetrics = {
- outputBufferBlockingDuration: [],
- }
- for (const item of bpRates) {
- bpMetrics.outputBufferBlockingDuration.push({
- metric: {
- actorId: item.actorId.toString(),
- fragmentId: item.fragmentId.toString(),
- downstreamFragmentId: item.downstreamFragmentId.toString(),
- },
- sample: [
- {
- timestamp: Date.now(),
- value: item.backPressureRate,
- },
- ],
- })
- }
- return bpMetrics
-}
-
-export function calculateCumulativeBp(
- backPressureCumulative: BackPressureInfo[],
- backPressureCurrent: BackPressureInfo[],
- backPressureNew: BackPressureInfo[]
-): BackPressureInfo[] {
- let mapCumulative = convertToMapAndAgg(backPressureCumulative)
- let mapCurrent = convertToMapAndAgg(backPressureCurrent)
- let mapNew = convertToMapAndAgg(backPressureNew)
- let mapResult = new Map()
- let keys = new Set([
- ...mapCumulative.keys(),
- ...mapCurrent.keys(),
- ...mapNew.keys(),
- ])
- keys.forEach((key) => {
- let backpressureCumulativeValue = mapCumulative.get(key) || 0
- let backpressureCurrentValue = mapCurrent.get(key) || 0
- let backpressureNewValue = mapNew.get(key) || 0
- let increment = backpressureNewValue - backpressureCurrentValue
- mapResult.set(key, backpressureCumulativeValue + increment)
- })
- const result: BackPressureInfo[] = []
- mapResult.forEach((value, key) => {
- const [fragmentId, downstreamFragmentId] = key.split("-").map(Number)
- const backPressureInfo: BackPressureInfo = {
- actorCount: 1, // the value here has already been averaged by real actor count
- fragmentId,
- downstreamFragmentId,
- value,
- }
- result.push(backPressureInfo)
- })
- return result
-}
-
-export function calculateBPRate(
- backPressureCumulative: BackPressureInfo[],
- totalDurationNs: number
-): BackPressuresMetrics {
- let map = convertToMapAndAgg(backPressureCumulative)
- let result = new Map()
- map.forEach((backpressureNs, key) => {
- let backpressureRateRatio = backpressureNs / totalDurationNs
- let backpressureRatePercent = backpressureRateRatio * 100
- result.set(key, backpressureRatePercent)
- })
- return convertToBackPressureMetrics(convertFromMapAndAgg(result))
-}
-
-// Get back pressure from meta node -> compute node
-export async function fetchEmbeddedBackPressure() {
- const response: GetBackPressureResponse = await api.get(
- "/metrics/fragment/embedded_back_pressures"
- )
- return response
-}
-
-function calculatePercentile(samples: MetricsSample[], percentile: number) {
- const sorted = samples.sort((a, b) => a.value - b.value)
- const index = Math.floor(sorted.length * percentile)
- return sorted[index].value
-}
-
-export function p50(samples: MetricsSample[]) {
- return calculatePercentile(samples, 0.5)
-}
-
-export function p90(samples: MetricsSample[]) {
- return calculatePercentile(samples, 0.9)
-}
-
-export function p95(samples: MetricsSample[]) {
- return calculatePercentile(samples, 0.95)
-}
-
-export function p99(samples: MetricsSample[]) {
- return calculatePercentile(samples, 0.99)
-}
-
-function isSet(value: any): boolean {
- return value !== null && value !== undefined
-}
diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx
index 05fc616278911..43ca85cca0d79 100644
--- a/dashboard/pages/fragment_graph.tsx
+++ b/dashboard/pages/fragment_graph.tsx
@@ -42,12 +42,8 @@ import FragmentDependencyGraph from "../components/FragmentDependencyGraph"
import FragmentGraph from "../components/FragmentGraph"
import Title from "../components/Title"
import useErrorToast from "../hook/useErrorToast"
+import api from "../lib/api/api"
import useFetch from "../lib/api/fetch"
-import {
- calculateBPRate,
- calculateCumulativeBp,
- fetchEmbeddedBackPressure,
-} from "../lib/api/metric"
import {
getFragmentsByJobId,
getRelationIdInfos,
@@ -190,13 +186,43 @@ function buildFragmentDependencyAsEdges(
const SIDEBAR_WIDTH = 225
-// The state of the embedded back pressure metrics.
-// The metrics from previous fetch are stored here to calculate the rate.
-interface EmbeddedBackPressureInfo {
- previous: BackPressureInfo[]
- current: BackPressureInfo[]
- totalBackpressureNs: BackPressureInfo[]
- totalDurationNs: number
+export class BackPressureSnapshot {
+ // The first fetch result.
+ // key: `_`
+ // value: output blocking duration in nanoseconds.
+ result: Map
+
+ // The time of the current fetch in milliseconds. (`Date.now()`)
+ time: number
+
+ constructor(result: Map, time: number) {
+ this.result = result
+ this.time = time
+ }
+
+ static fromResponse(channelStats: {
+ [key: string]: BackPressureInfo
+ }): BackPressureSnapshot {
+ const result = new Map()
+ for (const [key, info] of Object.entries(channelStats)) {
+ result.set(key, info.value / info.actorCount)
+ }
+ return new BackPressureSnapshot(result, Date.now())
+ }
+
+ getRate(initial: BackPressureSnapshot): Map {
+ const result = new Map()
+ for (const [key, value] of this.result) {
+ const initialValue = initial.result.get(key)
+ if (initialValue) {
+ result.set(
+ key,
+ (value - initialValue) / (this.time - initial.time) / 1000000
+ )
+ }
+ }
+ return result
+ }
}
export default function Streaming() {
@@ -308,40 +334,30 @@ export default function Streaming() {
toast(new Error(`Actor ${searchActorIdInt} not found`))
}
- // Periodically fetch embedded back-pressure from Meta node
- // Didn't call `useFetch()` because the `setState` way is special.
- const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] =
- useState()
+ // Keep the initial snapshot to calculate the rate of back pressure
+ const [backPressureRate, setBackPressureRate] =
+ useState