diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index e5179e69fdce4..926ed767b93ee 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -45,10 +45,6 @@ import { calculateBPRate, getActorBackPressures, getBackPressureWithoutPrometheus, - p50, - p90, - p95, - p99, } from "../lib/api/metric" import { getFragments, getStreamingJobs } from "../lib/api/streaming" import { FragmentBox } from "../lib/layout" @@ -179,9 +175,6 @@ function buildFragmentDependencyAsEdges( const SIDEBAR_WIDTH = 200 -type BackPressureAlgo = "p50" | "p90" | "p95" | "p99" -const backPressureAlgos: BackPressureAlgo[] = ["p50", "p90", "p95", "p99"] - type BackPressureDataSource = "Embedded" | "Prometheus" const backPressureDataSources: BackPressureDataSource[] = [ "Embedded", @@ -193,16 +186,15 @@ export default function Streaming() { const { response: fragmentList } = useFetch(getFragments) const [relationId, setRelationId] = useQueryState("id", parseAsInteger) - const [backPressureAlgo, setBackPressureAlgo] = useQueryState("backPressure") const [selectedFragmentId, setSelectedFragmentId] = useState() // used to get the data source - const [backPressureDataSourceAlgo, setBackPressureDataSourceAlgo] = - useState("Embedded") + const [backPressureDataSource, setBackPressureDataSource] = + useState("Embedded") const { response: actorBackPressures } = useFetch( getActorBackPressures, INTERVAL, - backPressureDataSourceAlgo === "Prometheus" && backPressureAlgo !== null + backPressureDataSource === "Prometheus" ) const fragmentDependencyCallback = useCallback(() => { @@ -234,6 +226,8 @@ export default function Streaming() { }, [relationId, relationList, setRelationId]) // get back pressure rate without prometheus + // TODO(bugen): extract the following logic to a hook and unify the interface + // with Prometheus data source. const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] = useState() const [previousBP, setPreviousBP] = useState([]) @@ -241,7 +235,7 @@ export default function Streaming() { const toast = useErrorToast() useEffect(() => { - if (backPressureDataSourceAlgo === "Embedded") { + if (backPressureDataSource === "Embedded") { const interval = setInterval(() => { const fetchNewBP = async () => { const newBP = await getBackPressureWithoutPrometheus() @@ -253,7 +247,7 @@ export default function Streaming() { }, INTERVAL) return () => clearInterval(interval) } - }, [currentBP, backPressureDataSourceAlgo]) + }, [currentBP, backPressureDataSource]) useEffect(() => { if (currentBP !== null && previousBP !== null) { @@ -330,14 +324,11 @@ export default function Streaming() { } const backPressures = useMemo(() => { - if ( - (actorBackPressures && backPressureAlgo && backPressureAlgo) || - backPressuresMetricsWithoutPromtheus - ) { + if (actorBackPressures || backPressuresMetricsWithoutPromtheus) { let map = new Map() if ( - backPressureDataSourceAlgo === "Embedded" && + backPressureDataSource === "Embedded" && backPressuresMetricsWithoutPromtheus ) { for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) { @@ -347,41 +338,32 @@ export default function Streaming() { ) } } else if ( - backPressureDataSourceAlgo !== "Embedded" && + backPressureDataSource === "Prometheus" && actorBackPressures ) { - for (const m of actorBackPressures.outputBufferBlockingDuration) { - let algoFunc - switch (backPressureAlgo) { - case "p50": - algoFunc = p50 - break - case "p90": - algoFunc = p90 - break - case "p95": - algoFunc = p95 - break - case "p99": - algoFunc = p99 - break - default: - return + if (actorBackPressures) { + for (const m of actorBackPressures.outputBufferBlockingDuration) { + if (m.sample.length > 0) { + // Note: We issue an instant query to Prometheus to get the most recent value. + // So there should be only one sample here. + // + // Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still + // possible that an old version of meta service returns a range-query result. + // So we take the one with the latest timestamp here. + const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100 + map.set( + `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, + value + ) + } } - - const value = algoFunc(m.sample) * 100 - map.set( - `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, - value - ) } } return map } }, [ - backPressureDataSourceAlgo, + backPressureDataSource, actorBackPressures, - backPressureAlgo, backPressuresMetricsWithoutPromtheus, ]) @@ -456,9 +438,11 @@ export default function Streaming() { Back Pressure Data Source - {backPressureDataSourceAlgo === "Prometheus" && ( - - Back Pressure Algorithm - - - )} Fragments {fragmentDependencyDag && ( diff --git a/src/meta/src/dashboard/prometheus.rs b/src/meta/src/dashboard/prometheus.rs index 7baade4969749..435dd6896e6e7 100644 --- a/src/meta/src/dashboard/prometheus.rs +++ b/src/meta/src/dashboard/prometheus.rs @@ -17,7 +17,7 @@ use std::time::SystemTime; use anyhow::anyhow; use axum::{Extension, Json}; -use prometheus_http_query::response::{RangeVector, Sample}; +use prometheus_http_query::response::{InstantVector, RangeVector, Sample}; use serde::Serialize; use super::handlers::{err, DashboardError}; @@ -41,6 +41,7 @@ impl From<&Sample> for PrometheusSample { #[derive(Serialize, Debug)] pub struct PrometheusVector { metric: HashMap, + // Multiple samples from `RangeVector` or single sample from `InstantVector`. sample: Vec, } @@ -48,7 +49,16 @@ impl From<&RangeVector> for PrometheusVector { fn from(value: &RangeVector) -> Self { PrometheusVector { metric: value.metric().clone(), - sample: value.samples().iter().map(PrometheusSample::from).collect(), + sample: value.samples().iter().map(Into::into).collect(), + } + } +} + +impl From<&InstantVector> for PrometheusVector { + fn from(value: &InstantVector) -> Self { + PrometheusVector { + metric: value.metric().clone(), + sample: vec![value.sample().into()], } } } @@ -134,27 +144,12 @@ pub async fn list_prometheus_fragment_back_pressure( Extension(srv): Extension, ) -> Result> { if let Some(ref client) = srv.prometheus_client { - let now = SystemTime::now(); let back_pressure_query = format!("avg(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) / 1000000000", srv.prometheus_selector); - let result = client - .query_range( - back_pressure_query, - now.duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64 - - 1800, - now.duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - 15.0, - ) - .get() - .await - .map_err(err)?; + let result = client.query(back_pressure_query).get().await.map_err(err)?; let back_pressure_data = result .data() - .as_matrix() + .as_vector() .unwrap() .iter() .map(PrometheusVector::from)