From 50c856c9c22e3e1794e5ab16c10f52922ad8abca Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 1 Mar 2024 17:07:57 +0800 Subject: [PATCH 01/13] remove the p50/90/99 backpressure --- dashboard/lib/api/metric.ts | 10 ++++- dashboard/pages/fragment_graph.tsx | 59 +++--------------------------- 2 files changed, 14 insertions(+), 55 deletions(-) diff --git a/dashboard/lib/api/metric.ts b/dashboard/lib/api/metric.ts index 1b36be23e0d02..a155909eb0d65 100644 --- a/dashboard/lib/api/metric.ts +++ b/dashboard/lib/api/metric.ts @@ -125,7 +125,7 @@ export function calculateBPRate( key, // The *100 in end of the formular is to convert the BP rate to the value used in web UI drawing ((value - (mapOld.get(key) || 0)) / ((INTERVAL / 1000) * 1000000000)) * - 100 + 100 ) } else { result.set(key, 0) @@ -180,6 +180,14 @@ export function p99(samples: MetricsSample[]) { return calculatePercentile(samples, 0.99) } +export function average(samples: MetricsSample[]) { + let sum = 0 + for (const sample of samples) { + sum += sample.value + } + return sum / samples.length +} + function isSet(value: any): boolean { return value !== null && value !== undefined } diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index e5179e69fdce4..ad9806643352a 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -42,13 +42,10 @@ import { BackPressureInfo, BackPressuresMetrics, INTERVAL, + average, calculateBPRate, getActorBackPressures, getBackPressureWithoutPrometheus, - p50, - p90, - p95, - p99, } from "../lib/api/metric" import { getFragments, getStreamingJobs } from "../lib/api/streaming" import { FragmentBox } from "../lib/layout" @@ -179,9 +176,6 @@ function buildFragmentDependencyAsEdges( const SIDEBAR_WIDTH = 200 -type BackPressureAlgo = "p50" | "p90" | "p95" | "p99" -const backPressureAlgos: BackPressureAlgo[] = ["p50", "p90", "p95", "p99"] - type BackPressureDataSource = "Embedded" | "Prometheus" const backPressureDataSources: BackPressureDataSource[] = [ "Embedded", @@ -193,7 +187,6 @@ export default function Streaming() { const { response: fragmentList } = useFetch(getFragments) const [relationId, setRelationId] = useQueryState("id", parseAsInteger) - const [backPressureAlgo, setBackPressureAlgo] = useQueryState("backPressure") const [selectedFragmentId, setSelectedFragmentId] = useState() // used to get the data source const [backPressureDataSourceAlgo, setBackPressureDataSourceAlgo] = @@ -202,7 +195,7 @@ export default function Streaming() { const { response: actorBackPressures } = useFetch( getActorBackPressures, INTERVAL, - backPressureDataSourceAlgo === "Prometheus" && backPressureAlgo !== null + backPressureDataSourceAlgo === "Prometheus" ) const fragmentDependencyCallback = useCallback(() => { @@ -230,7 +223,7 @@ export default function Streaming() { } } } - return () => {} + return () => { } }, [relationId, relationList, setRelationId]) // get back pressure rate without prometheus @@ -330,10 +323,7 @@ export default function Streaming() { } const backPressures = useMemo(() => { - if ( - (actorBackPressures && backPressureAlgo && backPressureAlgo) || - backPressuresMetricsWithoutPromtheus - ) { + if (actorBackPressures || backPressuresMetricsWithoutPromtheus) { let map = new Map() if ( @@ -351,25 +341,7 @@ export default function Streaming() { actorBackPressures ) { for (const m of actorBackPressures.outputBufferBlockingDuration) { - let algoFunc - switch (backPressureAlgo) { - case "p50": - algoFunc = p50 - break - case "p90": - algoFunc = p90 - break - case "p95": - algoFunc = p95 - break - case "p99": - algoFunc = p99 - break - default: - return - } - - const value = algoFunc(m.sample) * 100 + const value = average(m.sample) * 100 map.set( `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, value @@ -381,7 +353,6 @@ export default function Streaming() { }, [ backPressureDataSourceAlgo, actorBackPressures, - backPressureAlgo, backPressuresMetricsWithoutPromtheus, ]) @@ -468,26 +439,6 @@ export default function Streaming() { ))} - {backPressureDataSourceAlgo === "Prometheus" && ( - - Back Pressure Algorithm - - - )} Fragments {fragmentDependencyDag && ( From 4129e3aa870f27f2c89bfcac40f7854d4b0f3abf Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 1 Mar 2024 17:09:42 +0800 Subject: [PATCH 02/13] rename --- dashboard/lib/api/metric.ts | 3 +- dashboard/pages/fragment_graph.tsx | 49 +++++++++++++++--------------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/dashboard/lib/api/metric.ts b/dashboard/lib/api/metric.ts index a155909eb0d65..647419672f7b6 100644 --- a/dashboard/lib/api/metric.ts +++ b/dashboard/lib/api/metric.ts @@ -17,7 +17,6 @@ import { Metrics, MetricsSample } from "../../components/metrics" import api from "./api" -export const INTERVAL = 5000 export interface BackPressuresMetrics { outputBufferBlockingDuration: Metrics[] } @@ -149,7 +148,7 @@ export const BackPressureInfo = { } // Get back pressure from meta node -> compute node -export async function getBackPressureWithoutPrometheus() { +export async function fetchEmbeddedBackPressure() { const response = await api.get("/metrics/fragment/embedded_back_pressures") let backPressureInfos: BackPressureInfo[] = response.backPressureInfos.map( BackPressureInfo.fromJSON diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index ad9806643352a..c47d00d21674b 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -41,11 +41,10 @@ import useFetch from "../lib/api/fetch" import { BackPressureInfo, BackPressuresMetrics, - INTERVAL, average, calculateBPRate, getActorBackPressures, - getBackPressureWithoutPrometheus, + fetchEmbeddedBackPressure, } from "../lib/api/metric" import { getFragments, getStreamingJobs } from "../lib/api/streaming" import { FragmentBox } from "../lib/layout" @@ -56,6 +55,9 @@ interface DispatcherNode { [actorId: number]: Dispatcher[] } +// Refresh interval (ms) for back pressure stats +const INTERVAL = 5000 + /** Associated data of each plan node in the fragment graph, including the dispatchers. */ export interface PlanNodeDatum { name: string @@ -189,13 +191,13 @@ export default function Streaming() { const [relationId, setRelationId] = useQueryState("id", parseAsInteger) const [selectedFragmentId, setSelectedFragmentId] = useState() // used to get the data source - const [backPressureDataSourceAlgo, setBackPressureDataSourceAlgo] = + const [backPressureDataSource, setBackPressureDataSource] = useState("Embedded") - const { response: actorBackPressures } = useFetch( + const { response: promethusMetrics } = useFetch( getActorBackPressures, INTERVAL, - backPressureDataSourceAlgo === "Prometheus" + backPressureDataSource === "Prometheus" ) const fragmentDependencyCallback = useCallback(() => { @@ -212,7 +214,6 @@ export default function Streaming() { } } } - return undefined }, [fragmentList, relationId]) useEffect(() => { @@ -223,21 +224,21 @@ export default function Streaming() { } } } - return () => { } }, [relationId, relationList, setRelationId]) - // get back pressure rate without prometheus - const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] = + // get embedded back pressure rate from meta node + const [embeddedMetrics, setBackPressuresMetrics] = useState() const [previousBP, setPreviousBP] = useState([]) const [currentBP, setCurrentBP] = useState([]) const toast = useErrorToast() useEffect(() => { - if (backPressureDataSourceAlgo === "Embedded") { + if (backPressureDataSource === "Embedded") { const interval = setInterval(() => { const fetchNewBP = async () => { - const newBP = await getBackPressureWithoutPrometheus() + const newBP = await fetchEmbeddedBackPressure() + console.log(newBP) setPreviousBP(currentBP) setCurrentBP(newBP) } @@ -246,7 +247,7 @@ export default function Streaming() { }, INTERVAL) return () => clearInterval(interval) } - }, [currentBP, backPressureDataSourceAlgo]) + }, [currentBP, backPressureDataSource]) useEffect(() => { if (currentBP !== null && previousBP !== null) { @@ -323,24 +324,24 @@ export default function Streaming() { } const backPressures = useMemo(() => { - if (actorBackPressures || backPressuresMetricsWithoutPromtheus) { + if (promethusMetrics || embeddedMetrics) { let map = new Map() if ( - backPressureDataSourceAlgo === "Embedded" && - backPressuresMetricsWithoutPromtheus + backPressureDataSource === "Embedded" && + embeddedMetrics ) { - for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) { + for (const m of embeddedMetrics.outputBufferBlockingDuration) { map.set( `${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`, m.sample[0].value ) } } else if ( - backPressureDataSourceAlgo !== "Embedded" && - actorBackPressures + backPressureDataSource !== "Embedded" && + promethusMetrics ) { - for (const m of actorBackPressures.outputBufferBlockingDuration) { + for (const m of promethusMetrics.outputBufferBlockingDuration) { const value = average(m.sample) * 100 map.set( `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, @@ -351,9 +352,9 @@ export default function Streaming() { return map } }, [ - backPressureDataSourceAlgo, - actorBackPressures, - backPressuresMetricsWithoutPromtheus, + backPressureDataSource, + promethusMetrics, + embeddedMetrics, ]) const retVal = ( @@ -427,9 +428,9 @@ export default function Streaming() { Back Pressure Data Source diff --git a/src/meta/src/dashboard/prometheus.rs b/src/meta/src/dashboard/prometheus.rs index 7baade4969749..22b41cb2f3506 100644 --- a/src/meta/src/dashboard/prometheus.rs +++ b/src/meta/src/dashboard/prometheus.rs @@ -17,7 +17,7 @@ use std::time::SystemTime; use anyhow::anyhow; use axum::{Extension, Json}; -use prometheus_http_query::response::{RangeVector, Sample}; +use prometheus_http_query::response::{InstantVector, RangeVector, Sample}; use serde::Serialize; use super::handlers::{err, DashboardError}; @@ -53,6 +53,17 @@ impl From<&RangeVector> for PrometheusVector { } } +// Note(eric): For backward compatibility, we store the `InstantVector` as a single sample, +// instead of defining a new struct. +impl From<&InstantVector> for PrometheusVector { + fn from(value: &InstantVector) -> Self { + PrometheusVector { + metric: value.metric().clone(), + sample: vec![PrometheusSample::from(value.sample())], + } + } +} + #[derive(Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct ClusterMetrics { @@ -134,27 +145,12 @@ pub async fn list_prometheus_fragment_back_pressure( Extension(srv): Extension, ) -> Result> { if let Some(ref client) = srv.prometheus_client { - let now = SystemTime::now(); let back_pressure_query = format!("avg(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) / 1000000000", srv.prometheus_selector); - let result = client - .query_range( - back_pressure_query, - now.duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64 - - 1800, - now.duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - 15.0, - ) - .get() - .await - .map_err(err)?; + let result = client.query(back_pressure_query).get().await.map_err(err)?; let back_pressure_data = result .data() - .as_matrix() + .as_vector() .unwrap() .iter() .map(PrometheusVector::from) From 96de9ddaf6e04cb90478259183192efe76509cc7 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Sat, 2 Mar 2024 16:15:09 +0800 Subject: [PATCH 08/13] clean --- dashboard/lib/api/metric.ts | 8 -------- dashboard/pages/fragment_graph.tsx | 13 ++++++------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/dashboard/lib/api/metric.ts b/dashboard/lib/api/metric.ts index a0f4232a2ec4e..4d49d941c99e7 100644 --- a/dashboard/lib/api/metric.ts +++ b/dashboard/lib/api/metric.ts @@ -181,14 +181,6 @@ export function p99(samples: MetricsSample[]) { return calculatePercentile(samples, 0.99) } -export function average(samples: MetricsSample[]) { - let sum = 0 - for (const sample of samples) { - sum += sample.value - } - return sum / samples.length -} - function isSet(value: any): boolean { return value !== null && value !== undefined } diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index a3b5c6ced8395..c85dd1942ba9e 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -40,7 +40,6 @@ import useErrorToast from "../hook/useErrorToast" import useFetch from "../lib/api/fetch" import { BackPressureInfo, - average, calculateBPRate, fetchEmbeddedBackPressure, fetchPrometheusBackPressure, @@ -311,13 +310,13 @@ export default function Streaming() { setEmbeddedBackPressureInfo((prev) => prev ? { - previous: prev.current, - current: newBP, - } + previous: prev.current, + current: newBP, + } : { - previous: newBP, // Use current value to show zero rate, but it's fine - current: newBP, - } + previous: newBP, // Use current value to show zero rate, but it's fine + current: newBP, + } ) }, (e) => { From 6c185e7959f3c04e7288599b8760827cf2669906 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Mon, 4 Mar 2024 18:06:42 +0800 Subject: [PATCH 09/13] port some nice things from bugen's PR --- dashboard/pages/fragment_graph.tsx | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index c85dd1942ba9e..d1d6b3fb1a23d 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -288,7 +288,7 @@ export default function Streaming() { } const [backPressureDataSource, setBackPressureDataSource] = - useState("Embedded") + useState("Embedded") // Periodically fetch Prometheus back-pressure from Meta node const { response: promethusMetrics } = useFetch( @@ -354,10 +354,19 @@ export default function Streaming() { } } else if (backPressureDataSource === "Prometheus" && promethusMetrics) { for (const m of promethusMetrics.outputBufferBlockingDuration) { - map.set( - `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, - m.sample[0].value * 100 - ) + if (m.sample.length > 0) { + // Note: We issue an instant query to Prometheus to get the most recent value. + // So there should be only one sample here. + // + // Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still + // possible that an old version of meta service returns a range-query result. + // So we take the one with the latest timestamp here. + const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100 + map.set( + `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, + value + ) + } } } return map @@ -437,7 +446,9 @@ export default function Streaming() {