Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dashboard): only show real-time back-pressure rate from Prometheus data source #15380

Merged
merged 3 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 30 additions & 66 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ import {
calculateBPRate,
getActorBackPressures,
getBackPressureWithoutPrometheus,
p50,
p90,
p95,
p99,
} from "../lib/api/metric"
import { getFragments, getStreamingJobs } from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
Expand Down Expand Up @@ -179,9 +175,6 @@ function buildFragmentDependencyAsEdges(

const SIDEBAR_WIDTH = 200

type BackPressureAlgo = "p50" | "p90" | "p95" | "p99"
const backPressureAlgos: BackPressureAlgo[] = ["p50", "p90", "p95", "p99"]

type BackPressureDataSource = "Embedded" | "Prometheus"
const backPressureDataSources: BackPressureDataSource[] = [
"Embedded",
Expand All @@ -193,16 +186,15 @@ export default function Streaming() {
const { response: fragmentList } = useFetch(getFragments)

const [relationId, setRelationId] = useQueryState("id", parseAsInteger)
const [backPressureAlgo, setBackPressureAlgo] = useQueryState("backPressure")
const [selectedFragmentId, setSelectedFragmentId] = useState<number>()
// used to get the data source
const [backPressureDataSourceAlgo, setBackPressureDataSourceAlgo] =
useState("Embedded")
const [backPressureDataSource, setBackPressureDataSource] =
useState<BackPressureDataSource>("Embedded")

const { response: actorBackPressures } = useFetch(
getActorBackPressures,
INTERVAL,
backPressureDataSourceAlgo === "Prometheus" && backPressureAlgo !== null
backPressureDataSource === "Prometheus"
)

const fragmentDependencyCallback = useCallback(() => {
Expand Down Expand Up @@ -234,14 +226,16 @@ export default function Streaming() {
}, [relationId, relationList, setRelationId])

// get back pressure rate without prometheus
// TODO(bugen): extract the following logic to a hook and unify the interface
// with Prometheus data source.
const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] =
useState<BackPressuresMetrics>()
const [previousBP, setPreviousBP] = useState<BackPressureInfo[]>([])
const [currentBP, setCurrentBP] = useState<BackPressureInfo[]>([])
const toast = useErrorToast()

useEffect(() => {
if (backPressureDataSourceAlgo === "Embedded") {
if (backPressureDataSource === "Embedded") {
const interval = setInterval(() => {
const fetchNewBP = async () => {
const newBP = await getBackPressureWithoutPrometheus()
Expand All @@ -253,7 +247,7 @@ export default function Streaming() {
}, INTERVAL)
return () => clearInterval(interval)
}
}, [currentBP, backPressureDataSourceAlgo])
}, [currentBP, backPressureDataSource])

useEffect(() => {
if (currentBP !== null && previousBP !== null) {
Expand Down Expand Up @@ -330,14 +324,11 @@ export default function Streaming() {
}

const backPressures = useMemo(() => {
if (
(actorBackPressures && backPressureAlgo && backPressureAlgo) ||
backPressuresMetricsWithoutPromtheus
) {
if (actorBackPressures || backPressuresMetricsWithoutPromtheus) {
let map = new Map()

if (
backPressureDataSourceAlgo === "Embedded" &&
backPressureDataSource === "Embedded" &&
backPressuresMetricsWithoutPromtheus
) {
for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) {
Expand All @@ -347,41 +338,32 @@ export default function Streaming() {
)
}
} else if (
backPressureDataSourceAlgo !== "Embedded" &&
backPressureDataSource === "Prometheus" &&
actorBackPressures
) {
for (const m of actorBackPressures.outputBufferBlockingDuration) {
let algoFunc
switch (backPressureAlgo) {
case "p50":
algoFunc = p50
break
case "p90":
algoFunc = p90
break
case "p95":
algoFunc = p95
break
case "p99":
algoFunc = p99
break
default:
return
if (actorBackPressures) {
for (const m of actorBackPressures.outputBufferBlockingDuration) {
if (m.sample.length > 0) {
// Note: We issue an instant query to Prometheus to get the most recent value.
// So there should be only one sample here.
//
// Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still
// possible that an old version of meta service returns a range-query result.
// So we take the one with the latest timestamp here.
const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100
map.set(
`${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`,
value
)
}
}

const value = algoFunc(m.sample) * 100
map.set(
`${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`,
value
)
}
}
return map
}
}, [
backPressureDataSourceAlgo,
backPressureDataSource,
actorBackPressures,
backPressureAlgo,
backPressuresMetricsWithoutPromtheus,
])

Expand Down Expand Up @@ -456,9 +438,11 @@ export default function Streaming() {
<FormControl>
<FormLabel>Back Pressure Data Source</FormLabel>
<Select
value={backPressureDataSourceAlgo}
value={backPressureDataSource}
onChange={(event) =>
setBackPressureDataSourceAlgo(event.target.value)
setBackPressureDataSource(
event.target.value as BackPressureDataSource
)
}
>
{backPressureDataSources.map((algo) => (
Expand All @@ -468,26 +452,6 @@ export default function Streaming() {
))}
</Select>
</FormControl>
{backPressureDataSourceAlgo === "Prometheus" && (
<FormControl>
<FormLabel>Back Pressure Algorithm</FormLabel>
<Select
value={backPressureAlgo ?? undefined}
onChange={(event) => {
setBackPressureAlgo(event.target.value as BackPressureAlgo)
}}
>
<option value="" disabled selected hidden>
Please select
</option>
{backPressureAlgos.map((algo) => (
<option value={algo} key={algo}>
{algo}
</option>
))}
</Select>
</FormControl>
)}
<Flex height="full" width="full" flexDirection="column">
<Text fontWeight="semibold">Fragments</Text>
{fragmentDependencyDag && (
Expand Down
33 changes: 14 additions & 19 deletions src/meta/src/dashboard/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::SystemTime;

use anyhow::anyhow;
use axum::{Extension, Json};
use prometheus_http_query::response::{RangeVector, Sample};
use prometheus_http_query::response::{InstantVector, RangeVector, Sample};
use serde::Serialize;

use super::handlers::{err, DashboardError};
Expand All @@ -41,14 +41,24 @@ impl From<&Sample> for PrometheusSample {
#[derive(Serialize, Debug)]
pub struct PrometheusVector {
metric: HashMap<String, String>,
// Multiple samples from `RangeVector` or single sample from `InstantVector`.
sample: Vec<PrometheusSample>,
}

impl From<&RangeVector> for PrometheusVector {
fn from(value: &RangeVector) -> Self {
PrometheusVector {
metric: value.metric().clone(),
sample: value.samples().iter().map(PrometheusSample::from).collect(),
sample: value.samples().iter().map(Into::into).collect(),
}
}
}

impl From<&InstantVector> for PrometheusVector {
fn from(value: &InstantVector) -> Self {
PrometheusVector {
metric: value.metric().clone(),
sample: vec![value.sample().into()],
}
}
}
Expand Down Expand Up @@ -134,27 +144,12 @@ pub async fn list_prometheus_fragment_back_pressure(
Extension(srv): Extension<Service>,
) -> Result<Json<FragmentBackPressure>> {
if let Some(ref client) = srv.prometheus_client {
let now = SystemTime::now();
let back_pressure_query =
format!("avg(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) / 1000000000", srv.prometheus_selector);
let result = client
.query_range(
back_pressure_query,
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64
- 1800,
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
15.0,
)
.get()
.await
.map_err(err)?;
let result = client.query(back_pressure_query).get().await.map_err(err)?;
let back_pressure_data = result
.data()
.as_matrix()
.as_vector()
.unwrap()
.iter()
.map(PrometheusVector::from)
Expand Down
Loading