diff --git a/dashboard/README.md b/dashboard/README.md index 0c321a2a07de..1212aef93568 100644 --- a/dashboard/README.md +++ b/dashboard/README.md @@ -33,8 +33,10 @@ For example: ```bash ./risedev d -sqllogictest -p 4566 -d dev './e2e_test/nexmark/create_tables.slt.part' -sqllogictest -p 4566 -d dev './e2e_test/streaming/nexmark/create_views.slt.part' +./risedev slt e2e_test/nexmark/create_sources.slt.part +./risedev psql -c 'CREATE TABLE dimension (v1 int);' +./risedev psql -c 'CREATE MATERIALIZED VIEW mv AS SELECT auction.* FROM dimension join auction on auction.id-auction.id = dimension.v1;' +./risedev psql -c 'INSERT INTO dimension select 0 from generate_series(1, 50);' ``` Install dependencies and start the development server. diff --git a/dashboard/lib/api/metric.ts b/dashboard/lib/api/metric.ts index 1fabec81f94f..ba3269041206 100644 --- a/dashboard/lib/api/metric.ts +++ b/dashboard/lib/api/metric.ts @@ -112,28 +112,52 @@ function convertToBackPressureMetrics( return bpMetrics } -export function calculateBPRate( - backPressureNew: BackPressureInfo[], - backPressureOld: BackPressureInfo[], - intervalMs: number -): BackPressuresMetrics { +export function calculateCumulativeBp( + backPressureCumulative: BackPressureInfo[], + backPressureCurrent: BackPressureInfo[], + backPressureNew: BackPressureInfo[] +): BackPressureInfo[] { + let mapCumulative = convertToMapAndAgg(backPressureCumulative) + let mapCurrent = convertToMapAndAgg(backPressureCurrent) let mapNew = convertToMapAndAgg(backPressureNew) - let mapOld = convertToMapAndAgg(backPressureOld) - let result = new Map() - mapNew.forEach((value, key) => { - if (mapOld.has(key)) { - result.set( - key, - // The *100 in end of the formular is to convert the BP rate to the value used in web UI drawing - ((value - (mapOld.get(key) || 0)) / - ((intervalMs / 1000) * 1000000000)) * - 100 - ) - } else { - result.set(key, 0) + let mapResult = new Map() + let keys = new Set([ + ...mapCumulative.keys(), + ...mapCurrent.keys(), + ...mapNew.keys(), + ]) + keys.forEach((key) => { + let backpressureCumulativeValue = mapCumulative.get(key) || 0 + let backpressureCurrentValue = mapCurrent.get(key) || 0 + let backpressureNewValue = mapNew.get(key) || 0 + let increment = backpressureNewValue - backpressureCurrentValue + mapResult.set(key, backpressureCumulativeValue + increment) + }) + const result: BackPressureInfo[] = [] + mapResult.forEach((value, key) => { + const [fragmentId, downstreamFragmentId] = key.split("-").map(Number) + const backPressureInfo: BackPressureInfo = { + actorId: 0, + fragmentId, + downstreamFragmentId, + value, } + result.push(backPressureInfo) }) + return result +} +export function calculateBPRate( + backPressureCumulative: BackPressureInfo[], + totalDurationNs: number +): BackPressuresMetrics { + let map = convertToMapAndAgg(backPressureCumulative) + let result = new Map() + map.forEach((backpressureNs, key) => { + let backpressureRateRatio = backpressureNs / totalDurationNs + let backpressureRatePercent = backpressureRateRatio * 100 + result.set(key, backpressureRatePercent) + }) return convertToBackPressureMetrics(convertFromMapAndAgg(result)) } diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index f77ab2b5a30e..1c5f6d774653 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -41,6 +41,7 @@ import useFetch from "../lib/api/fetch" import { BackPressureInfo, calculateBPRate, + calculateCumulativeBp, fetchEmbeddedBackPressure, fetchPrometheusBackPressure, } from "../lib/api/metric" @@ -54,7 +55,7 @@ interface DispatcherNode { } // Refresh interval (ms) for back pressure stats -const INTERVAL = 5000 +const INTERVAL_MS = 5000 /** Associated data of each plan node in the fragment graph, including the dispatchers. */ export interface PlanNodeDatum { @@ -187,6 +188,8 @@ const backPressureDataSources: BackPressureDataSource[] = [ interface EmbeddedBackPressureInfo { previous: BackPressureInfo[] current: BackPressureInfo[] + totalBackpressureNs: BackPressureInfo[] + totalDurationNs: number } export default function Streaming() { @@ -293,7 +296,7 @@ export default function Streaming() { // Periodically fetch Prometheus back-pressure from Meta node const { response: promethusMetrics } = useFetch( fetchPrometheusBackPressure, - INTERVAL, + INTERVAL_MS, backPressureDataSource === "Prometheus" ) @@ -312,10 +315,19 @@ export default function Streaming() { ? { previous: prev.current, current: newBP, + totalBackpressureNs: calculateCumulativeBp( + prev.totalBackpressureNs, + prev.current, + newBP + ), + totalDurationNs: + prev.totalDurationNs + INTERVAL_MS * 1000 * 1000, } : { previous: newBP, // Use current value to show zero rate, but it's fine current: newBP, + totalBackpressureNs: [], + totalDurationNs: 0, } ) }, @@ -324,7 +336,7 @@ export default function Streaming() { toast(e, "error") } ) - }, INTERVAL) + }, INTERVAL_MS) return () => { clearInterval(interval) } @@ -337,9 +349,8 @@ export default function Streaming() { if (backPressureDataSource === "Embedded" && embeddedBackPressureInfo) { const metrics = calculateBPRate( - embeddedBackPressureInfo.current, - embeddedBackPressureInfo.previous, - INTERVAL + embeddedBackPressureInfo.totalBackpressureNs, + embeddedBackPressureInfo.totalDurationNs ) for (const m of metrics.outputBufferBlockingDuration) { map.set(