diff --git a/dashboard/components/RateBar.tsx b/dashboard/components/RateBar.tsx deleted file mode 100644 index 4c4c12cc32363..0000000000000 --- a/dashboard/components/RateBar.tsx +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2025 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { Box, Text, Tooltip } from "@chakra-ui/react" -import { tinycolor } from "@ctrl/tinycolor" -import { p50, p90, p95, p99 } from "../lib/api/metric" -import { MetricsSample } from "./metrics" - -export default function RateBar({ samples }: { samples: MetricsSample[] }) { - const p_50 = (p50(samples) * 100).toFixed(6) - const p_95 = (p95(samples) * 100).toFixed(6) - const p_99 = (p99(samples) * 100).toFixed(6) - const p_90 = p90(samples) * 100 - - const bgWidth = Math.ceil(p_90).toFixed(6) + "%" - const detailRate = `p50: ${p_50}% p95: ${p_95}% p99: ${p_99}%` - - // calculate gradient color - const colorRange = ["#C6F6D5", "#C53030"] - const endColor = tinycolor(colorRange[0]) - .mix(tinycolor(colorRange[1]), Math.ceil(p_90)) - .toHexString() - const bgGradient = `linear(to-r, ${colorRange[0]}, ${endColor})` - - return ( - - - p90: {p_90.toFixed(6)}% - - - ) -} diff --git a/dashboard/lib/api/metric.ts b/dashboard/lib/api/metric.ts deleted file mode 100644 index 92172d9163a55..0000000000000 --- a/dashboard/lib/api/metric.ts +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Copyright 2025 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -import { MetricsSample } from "../../components/metrics" -import { - BackPressureInfo, - GetBackPressureResponse, -} from "../../proto/gen/monitor_service" -import api from "./api" - -export interface BackPressureRateInfo { - actorId: number - fragmentId: number - downstreamFragmentId: number - backPressureRate: number -} - -function convertToMapAndAgg( - backPressures: BackPressureInfo[] -): Map { - // FragmentId-downstreamFragmentId, total value - const mapValue = new Map() - // FragmentId-downstreamFragmentId, total count - const mapNumber = new Map() - // FragmentId-downstreamFragmentId, average value - const map = new Map() - for (const item of backPressures) { - const key = `${item.fragmentId}-${item.downstreamFragmentId}` - mapValue.set(key, (mapValue.get(key) || 0) + item.value) - mapNumber.set(key, (mapNumber.get(key) || 0) + item.actorCount) - } - - for (const [key, value] of mapValue) { - map.set(key, value / mapNumber.get(key)!) - } - return map -} - -function convertFromMapAndAgg( - map: Map -): BackPressureRateInfo[] { - const result: BackPressureRateInfo[] = [] - map.forEach((value, key) => { - const [fragmentId, downstreamFragmentId] = key.split("-").map(Number) - const backPressureRateInfo: BackPressureRateInfo = { - actorId: 0, - fragmentId, - downstreamFragmentId, - backPressureRate: value, - } - result.push(backPressureRateInfo) - }) - return result -} - -function convertToBackPressureMetrics( - bpRates: BackPressureRateInfo[] -): BackPressuresMetrics { - const bpMetrics: BackPressuresMetrics = { - outputBufferBlockingDuration: [], - } - for (const item of bpRates) { - bpMetrics.outputBufferBlockingDuration.push({ - metric: { - actorId: item.actorId.toString(), - fragmentId: item.fragmentId.toString(), - downstreamFragmentId: item.downstreamFragmentId.toString(), - }, - sample: [ - { - timestamp: Date.now(), - value: item.backPressureRate, - }, - ], - }) - } - return bpMetrics -} - -export function calculateCumulativeBp( - backPressureCumulative: BackPressureInfo[], - backPressureCurrent: BackPressureInfo[], - backPressureNew: BackPressureInfo[] -): BackPressureInfo[] { - let mapCumulative = convertToMapAndAgg(backPressureCumulative) - let mapCurrent = convertToMapAndAgg(backPressureCurrent) - let mapNew = convertToMapAndAgg(backPressureNew) - let mapResult = new Map() - let keys = new Set([ - ...mapCumulative.keys(), - ...mapCurrent.keys(), - ...mapNew.keys(), - ]) - keys.forEach((key) => { - let backpressureCumulativeValue = mapCumulative.get(key) || 0 - let backpressureCurrentValue = mapCurrent.get(key) || 0 - let backpressureNewValue = mapNew.get(key) || 0 - let increment = backpressureNewValue - backpressureCurrentValue - mapResult.set(key, backpressureCumulativeValue + increment) - }) - const result: BackPressureInfo[] = [] - mapResult.forEach((value, key) => { - const [fragmentId, downstreamFragmentId] = key.split("-").map(Number) - const backPressureInfo: BackPressureInfo = { - actorCount: 1, // the value here has already been averaged by real actor count - fragmentId, - downstreamFragmentId, - value, - } - result.push(backPressureInfo) - }) - return result -} - -export function calculateBPRate( - backPressureCumulative: BackPressureInfo[], - totalDurationNs: number -): BackPressuresMetrics { - let map = convertToMapAndAgg(backPressureCumulative) - let result = new Map() - map.forEach((backpressureNs, key) => { - let backpressureRateRatio = backpressureNs / totalDurationNs - let backpressureRatePercent = backpressureRateRatio * 100 - result.set(key, backpressureRatePercent) - }) - return convertToBackPressureMetrics(convertFromMapAndAgg(result)) -} - -// Get back pressure from meta node -> compute node -export async function fetchEmbeddedBackPressure() { - const response: GetBackPressureResponse = await api.get( - "/metrics/fragment/embedded_back_pressures" - ) - return response -} - -function calculatePercentile(samples: MetricsSample[], percentile: number) { - const sorted = samples.sort((a, b) => a.value - b.value) - const index = Math.floor(sorted.length * percentile) - return sorted[index].value -} - -export function p50(samples: MetricsSample[]) { - return calculatePercentile(samples, 0.5) -} - -export function p90(samples: MetricsSample[]) { - return calculatePercentile(samples, 0.9) -} - -export function p95(samples: MetricsSample[]) { - return calculatePercentile(samples, 0.95) -} - -export function p99(samples: MetricsSample[]) { - return calculatePercentile(samples, 0.99) -} - -function isSet(value: any): boolean { - return value !== null && value !== undefined -} diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 05fc616278911..43ca85cca0d79 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -42,12 +42,8 @@ import FragmentDependencyGraph from "../components/FragmentDependencyGraph" import FragmentGraph from "../components/FragmentGraph" import Title from "../components/Title" import useErrorToast from "../hook/useErrorToast" +import api from "../lib/api/api" import useFetch from "../lib/api/fetch" -import { - calculateBPRate, - calculateCumulativeBp, - fetchEmbeddedBackPressure, -} from "../lib/api/metric" import { getFragmentsByJobId, getRelationIdInfos, @@ -190,13 +186,43 @@ function buildFragmentDependencyAsEdges( const SIDEBAR_WIDTH = 225 -// The state of the embedded back pressure metrics. -// The metrics from previous fetch are stored here to calculate the rate. -interface EmbeddedBackPressureInfo { - previous: BackPressureInfo[] - current: BackPressureInfo[] - totalBackpressureNs: BackPressureInfo[] - totalDurationNs: number +export class BackPressureSnapshot { + // The first fetch result. + // key: `_` + // value: output blocking duration in nanoseconds. + result: Map + + // The time of the current fetch in milliseconds. (`Date.now()`) + time: number + + constructor(result: Map, time: number) { + this.result = result + this.time = time + } + + static fromResponse(channelStats: { + [key: string]: BackPressureInfo + }): BackPressureSnapshot { + const result = new Map() + for (const [key, info] of Object.entries(channelStats)) { + result.set(key, info.value / info.actorCount) + } + return new BackPressureSnapshot(result, Date.now()) + } + + getRate(initial: BackPressureSnapshot): Map { + const result = new Map() + for (const [key, value] of this.result) { + const initialValue = initial.result.get(key) + if (initialValue) { + result.set( + key, + (value - initialValue) / (this.time - initial.time) / 1000000 + ) + } + } + return result + } } export default function Streaming() { @@ -308,40 +334,30 @@ export default function Streaming() { toast(new Error(`Actor ${searchActorIdInt} not found`)) } - // Periodically fetch embedded back-pressure from Meta node - // Didn't call `useFetch()` because the `setState` way is special. - const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] = - useState() + // Keep the initial snapshot to calculate the rate of back pressure + const [backPressureRate, setBackPressureRate] = + useState>() + const [fragmentStats, setFragmentStats] = useState<{ [key: number]: FragmentStats }>() useEffect(() => { + // The initial snapshot is used to calculate the rate of back pressure + // It's not used to render the page directly, so we don't need to set it in the state + let initialSnapshot: BackPressureSnapshot | undefined + function refresh() { - fetchEmbeddedBackPressure().then( + api.get("/metrics/fragment/embedded_back_pressures").then( (response) => { - let newBP = - response.backPressureInfos?.map(BackPressureInfo.fromJSON) ?? [] - setEmbeddedBackPressureInfo((prev) => - prev - ? { - previous: prev.current, - current: newBP, - totalBackpressureNs: calculateCumulativeBp( - prev.totalBackpressureNs, - prev.current, - newBP - ), - totalDurationNs: - prev.totalDurationNs + INTERVAL_MS * 1000 * 1000, - } - : { - previous: newBP, // Use current value to show zero rate, but it's fine - current: newBP, - totalBackpressureNs: [], - totalDurationNs: 0, - } + let snapshot = BackPressureSnapshot.fromResponse( + response.channelStats ) + if (!initialSnapshot) { + initialSnapshot = snapshot + } else { + setBackPressureRate(snapshot.getRate(initialSnapshot!)) + } setFragmentStats(response.fragmentStats) }, (e) => { @@ -350,33 +366,13 @@ export default function Streaming() { } ) } - refresh() - const interval = setInterval(refresh, INTERVAL_MS) + refresh() // run once immediately + const interval = setInterval(refresh, INTERVAL_MS) // and then run every interval return () => { clearInterval(interval) } }, [toast]) - const backPressures = useMemo(() => { - if (embeddedBackPressureInfo) { - let map = new Map() - - if (embeddedBackPressureInfo) { - const metrics = calculateBPRate( - embeddedBackPressureInfo.totalBackpressureNs, - embeddedBackPressureInfo.totalDurationNs - ) - for (const m of metrics.outputBufferBlockingDuration) { - map.set( - `${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`, - m.sample[0].value - ) - } - } - return map - } - }, [embeddedBackPressureInfo]) - const retVal = ( Fragment Graph @@ -503,7 +499,7 @@ export default function Streaming() { selectedFragmentId={selectedFragmentId?.toString()} fragmentDependency={fragmentDependency} planNodeDependencies={planNodeDependencies} - backPressures={backPressures} + backPressures={backPressureRate} fragmentStats={fragmentStats} /> )} diff --git a/dashboard/pages/relation_graph.tsx b/dashboard/pages/relation_graph.tsx index 8ded8196e6130..68ebcd917e634 100644 --- a/dashboard/pages/relation_graph.tsx +++ b/dashboard/pages/relation_graph.tsx @@ -23,12 +23,8 @@ import { Fragment, useCallback, useEffect, useMemo, useState } from "react" import RelationGraph, { boxHeight, boxWidth } from "../components/RelationGraph" import Title from "../components/Title" import useErrorToast from "../hook/useErrorToast" +import api from "../lib/api/api" import useFetch from "../lib/api/fetch" -import { - calculateBPRate, - calculateCumulativeBp, - fetchEmbeddedBackPressure, -} from "../lib/api/metric" import { Relation, getFragmentVertexToRelationMap, @@ -37,20 +33,12 @@ import { relationIsStreamingJob, } from "../lib/api/streaming" import { RelationPoint } from "../lib/layout" -import { BackPressureInfo, RelationStats } from "../proto/gen/monitor_service" +import { RelationStats } from "../proto/gen/monitor_service" +import { BackPressureSnapshot } from "./fragment_graph" const SIDEBAR_WIDTH = "200px" const INTERVAL_MS = 5000 -// The state of the back pressure metrics. -// The metrics from previous fetch are stored here to calculate the rate. -interface EmbeddedBackPressureInfo { - previous: BackPressureInfo[] - current: BackPressureInfo[] - totalBackpressureNs: BackPressureInfo[] - totalDurationNs: number -} - function buildDependencyAsEdges( list: Relation[], relation_deps: Map @@ -107,43 +95,34 @@ export default function StreamingGraph() { const relationDependency = relationDependencyCallback() - // Periodically fetch back-pressure from Meta node - // Didn't call `useFetch()` because the `setState` way is special. - const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] = - useState() + // Periodically fetch fragment-level back-pressure from Meta node + const [backPressureRate, setBackPressureRate] = + useState>() const [relationStats, setRelationStats] = useState<{ [key: number]: RelationStats }>() useEffect(() => { + // The initial snapshot is used to calculate the rate of back pressure + // It's not used to render the page directly, so we don't need to set it in the state + let initialSnapshot: BackPressureSnapshot | undefined + if (resetEmbeddedBackPressures) { - setEmbeddedBackPressureInfo(undefined) + setBackPressureRate(undefined) toggleResetEmbeddedBackPressures() } + function refresh() { - fetchEmbeddedBackPressure().then( + api.get("/metrics/fragment/embedded_back_pressures").then( (response) => { - let newBP = response.backPressureInfos - setEmbeddedBackPressureInfo((prev) => - prev - ? { - previous: prev.current, - current: newBP, - totalBackpressureNs: calculateCumulativeBp( - prev.totalBackpressureNs, - prev.current, - newBP - ), - totalDurationNs: - prev.totalDurationNs + INTERVAL_MS * 1000 * 1000, - } - : { - previous: newBP, // Use current value to show zero rate, but it's fine - current: newBP, - totalBackpressureNs: [], - totalDurationNs: 0, - } + let snapshot = BackPressureSnapshot.fromResponse( + response.channelStats ) + if (!initialSnapshot) { + initialSnapshot = snapshot + } else { + setBackPressureRate(snapshot.getRate(initialSnapshot!)) + } setRelationStats(response.relationStats) }, (e) => { @@ -152,40 +131,34 @@ export default function StreamingGraph() { } ) } - refresh() - const interval = setInterval(refresh, INTERVAL_MS) + refresh() // run once immediately + const interval = setInterval(refresh, INTERVAL_MS) // and then run every interval return () => { clearInterval(interval) } }, [toast, resetEmbeddedBackPressures]) - // Get relationId-relationId -> backpressure rate map + // Convert fragment-level backpressure rate map to relation-level backpressure rate const backPressures: Map | undefined = useMemo(() => { if (!fragmentVertexToRelationMap) { return new Map() } let inMap = fragmentVertexToRelationMap.inMap let outMap = fragmentVertexToRelationMap.outMap - if (embeddedBackPressureInfo) { + if (backPressureRate) { let map = new Map() - - const metrics = calculateBPRate( - embeddedBackPressureInfo.totalBackpressureNs, - embeddedBackPressureInfo.totalDurationNs - ) - for (const m of metrics.outputBufferBlockingDuration) { - let output = Number(m.metric.fragmentId) - let input = Number(m.metric.downstreamFragmentId) - if (outMap[output] && inMap[input]) { - output = outMap[output] - input = inMap[input] - let key = `${output}_${input}` - map.set(key, m.sample[0].value) + for (const [key, value] of backPressureRate) { + const [outputFragment, inputFragment] = key.split("_").map(Number) + if (outMap[outputFragment] && inMap[inputFragment]) { + const outputRelation = outMap[outputFragment] + const inputRelation = inMap[inputFragment] + let key = `${outputRelation}_${inputRelation}` + map.set(key, value) } } return map } - }, [embeddedBackPressureInfo, fragmentVertexToRelationMap]) + }, [backPressureRate, fragmentVertexToRelationMap]) const retVal = ( diff --git a/proto/monitor_service.proto b/proto/monitor_service.proto index 08fc97bb73f9b..abd366df3d308 100644 --- a/proto/monitor_service.proto +++ b/proto/monitor_service.proto @@ -55,9 +55,8 @@ message AnalyzeHeapResponse { message GetBackPressureRequest {} message BackPressureInfo { - uint32 fragment_id = 1; - uint32 downstream_fragment_id = 2; uint32 actor_count = 3; + // Sum of output blocking duration of all actors double value = 4; } @@ -72,7 +71,8 @@ message RelationStats { } message GetBackPressureResponse { - repeated BackPressureInfo back_pressure_infos = 1; + // Key: "_" + map channel_stats = 1; map fragment_stats = 2; map relation_stats = 3; } diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 78c5decd05287..3c52147ee5a85 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -312,7 +312,7 @@ impl MonitorService for MonitorServiceImpl { let actor_count: HashMap<_, _> = actor_count .iter() .map(|m| { - let fragment_id = get_label(m, "fragment_id").unwrap(); + let fragment_id: u32 = get_label(m, "fragment_id").unwrap(); let count = m.get_gauge().get_value() as u32; (fragment_id, count) }) @@ -337,7 +337,7 @@ impl MonitorService for MonitorServiceImpl { .unwrap() .take_metric(); for m in &actor_current_epoch { - let fragment_id = get_label(m, "fragment_id").unwrap(); + let fragment_id: u32 = get_label(m, "fragment_id").unwrap(); let epoch = m.get_gauge().get_value() as u64; if let Some(s) = fragment_stats.get_mut(&fragment_id) { s.current_epoch = if s.current_epoch == 0 { @@ -362,7 +362,7 @@ impl MonitorService for MonitorServiceImpl { .unwrap() .take_metric(); for m in &mview_current_epoch { - let table_id = get_label(m, "table_id").unwrap(); + let table_id: u32 = get_label(m, "table_id").unwrap(); let epoch = m.get_gauge().get_value() as u64; if let Some(s) = relation_stats.get_mut(&table_id) { s.current_epoch = if s.current_epoch == 0 { @@ -382,44 +382,33 @@ impl MonitorService for MonitorServiceImpl { } } - let mut back_pressure_infos: HashMap<_, BackPressureInfo> = HashMap::new(); + let mut channel_stats: HashMap<_, BackPressureInfo> = HashMap::new(); - for label_pairs in actor_output_buffer_blocking_duration_ns { - let mut fragment_id = None; - let mut downstream_fragment_id = None; - for label_pair in label_pairs.get_label() { - if label_pair.get_name() == "fragment_id" { - fragment_id = label_pair.get_value().parse::().ok(); - } - if label_pair.get_name() == "downstream_fragment_id" { - downstream_fragment_id = label_pair.get_value().parse::().ok(); - } - } - let Some(fragment_id) = fragment_id else { - continue; - }; - let Some(downstream_fragment_id) = downstream_fragment_id else { - continue; - }; + for metric in actor_output_buffer_blocking_duration_ns { + let fragment_id: u32 = get_label(&metric, "fragment_id").unwrap(); + let downstream_fragment_id: u32 = get_label(&metric, "downstream_fragment_id").unwrap(); - // When metrics level is Debug, we may have multiple metrics with the same label pairs - // (fragment_id, downstream_fragment_id). We need to aggregate them locally. - // - // Metrics from different compute nodes should be aggregated by the caller. - let back_pressure_info = back_pressure_infos - .entry((fragment_id, downstream_fragment_id)) + let key = format!("{}_{}", fragment_id, downstream_fragment_id); + let channel_stat = channel_stats + .entry(key) .or_insert_with(|| BackPressureInfo { - fragment_id, - downstream_fragment_id, - actor_count: actor_count.get(&fragment_id).copied().unwrap_or_default(), + actor_count: 0, value: 0., }); - back_pressure_info.value += label_pairs.get_counter().get_value(); + // When metrics level is Debug, `actor_id` will be removed to reduce metrics. + // See `src/common/metrics/src/relabeled_metric.rs` + channel_stat.actor_count += + if get_label::(&metric, "actor_id").unwrap().is_empty() { + actor_count[&fragment_id] + } else { + 1 + }; + channel_stat.value += metric.get_counter().get_value(); } Ok(Response::new(GetBackPressureResponse { - back_pressure_infos: back_pressure_infos.into_values().collect(), + channel_stats, fragment_stats, relation_stats, })) @@ -498,13 +487,13 @@ impl MonitorService for MonitorServiceImpl { } } -fn get_label(metric: &Metric, label: &str) -> Option { +fn get_label(metric: &Metric, label: &str) -> Option { metric .get_label() .iter() .find(|lp| lp.get_name() == label)? .get_value() - .parse::() + .parse::() .ok() } diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index caff433a79b6e..8153b102ae9da 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -489,8 +489,6 @@ pub(super) mod handlers { let result = result .map_err(|_| anyhow!("Failed to get back pressure")) .map_err(err)?; - // TODO(eric): aggregate back_pressure_infos here - all.back_pressure_infos.extend(result.back_pressure_infos); // Aggregate fragment_stats for (fragment_id, fragment_stats) in result.fragment_stats { @@ -511,6 +509,16 @@ pub(super) mod handlers { all.relation_stats.insert(relation_id, relation_stats); } } + + // Aggregate channel_stats + for (key, channel_stats) in result.channel_stats { + if let Some(s) = all.channel_stats.get_mut(&key) { + s.actor_count += channel_stats.actor_count; + s.value += channel_stats.value; + } else { + all.channel_stats.insert(key, channel_stats); + } + } } Ok(all.into())