diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index e5179e69fdce4..4b2704adc9592 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -45,10 +45,6 @@ import { calculateBPRate, getActorBackPressures, getBackPressureWithoutPrometheus, - p50, - p90, - p95, - p99, } from "../lib/api/metric" import { getFragments, getStreamingJobs } from "../lib/api/streaming" import { FragmentBox } from "../lib/layout" @@ -179,9 +175,6 @@ function buildFragmentDependencyAsEdges( const SIDEBAR_WIDTH = 200 -type BackPressureAlgo = "p50" | "p90" | "p95" | "p99" -const backPressureAlgos: BackPressureAlgo[] = ["p50", "p90", "p95", "p99"] - type BackPressureDataSource = "Embedded" | "Prometheus" const backPressureDataSources: BackPressureDataSource[] = [ "Embedded", @@ -193,16 +186,15 @@ export default function Streaming() { const { response: fragmentList } = useFetch(getFragments) const [relationId, setRelationId] = useQueryState("id", parseAsInteger) - const [backPressureAlgo, setBackPressureAlgo] = useQueryState("backPressure") const [selectedFragmentId, setSelectedFragmentId] = useState() // used to get the data source - const [backPressureDataSourceAlgo, setBackPressureDataSourceAlgo] = - useState("Embedded") + const [backPressureDataSource, setBackPressureDataSource] = + useState("Embedded") const { response: actorBackPressures } = useFetch( getActorBackPressures, INTERVAL, - backPressureDataSourceAlgo === "Prometheus" && backPressureAlgo !== null + backPressureDataSource === "Prometheus" ) const fragmentDependencyCallback = useCallback(() => { @@ -234,6 +226,8 @@ export default function Streaming() { }, [relationId, relationList, setRelationId]) // get back pressure rate without prometheus + // TODO(bugen): extract the following logic to a hook and unify the interface + // with Prometheus data source. const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] = useState() const [previousBP, setPreviousBP] = useState([]) @@ -241,7 +235,7 @@ export default function Streaming() { const toast = useErrorToast() useEffect(() => { - if (backPressureDataSourceAlgo === "Embedded") { + if (backPressureDataSource === "Embedded") { const interval = setInterval(() => { const fetchNewBP = async () => { const newBP = await getBackPressureWithoutPrometheus() @@ -253,7 +247,7 @@ export default function Streaming() { }, INTERVAL) return () => clearInterval(interval) } - }, [currentBP, backPressureDataSourceAlgo]) + }, [currentBP, backPressureDataSource]) useEffect(() => { if (currentBP !== null && previousBP !== null) { @@ -330,14 +324,11 @@ export default function Streaming() { } const backPressures = useMemo(() => { - if ( - (actorBackPressures && backPressureAlgo && backPressureAlgo) || - backPressuresMetricsWithoutPromtheus - ) { + if (actorBackPressures || backPressuresMetricsWithoutPromtheus) { let map = new Map() if ( - backPressureDataSourceAlgo === "Embedded" && + backPressureDataSource === "Embedded" && backPressuresMetricsWithoutPromtheus ) { for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) { @@ -347,41 +338,29 @@ export default function Streaming() { ) } } else if ( - backPressureDataSourceAlgo !== "Embedded" && + backPressureDataSource === "Prometheus" && actorBackPressures ) { - for (const m of actorBackPressures.outputBufferBlockingDuration) { - let algoFunc - switch (backPressureAlgo) { - case "p50": - algoFunc = p50 - break - case "p90": - algoFunc = p90 - break - case "p95": - algoFunc = p95 - break - case "p99": - algoFunc = p99 - break - default: - return + if (actorBackPressures) { + for (const m of actorBackPressures.outputBufferBlockingDuration) { + if (m.sample.length > 0) { + // We issue an instant query to Prometheus to get the most recent value. + // So there should be only one sample here. + console.assert(m.sample.length === 1) + const value = m.sample[0].value * 100 + map.set( + `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, + value + ) + } } - - const value = algoFunc(m.sample) * 100 - map.set( - `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, - value - ) } } return map } }, [ - backPressureDataSourceAlgo, + backPressureDataSource, actorBackPressures, - backPressureAlgo, backPressuresMetricsWithoutPromtheus, ]) @@ -456,9 +435,11 @@ export default function Streaming() { Back Pressure Data Source - {backPressureDataSourceAlgo === "Prometheus" && ( - - Back Pressure Algorithm - - - )} Fragments {fragmentDependencyDag && (