diff --git a/dashboard/components/BackPressureTable.tsx b/dashboard/components/BackPressureTable.tsx deleted file mode 100644 index ad790c0cb680a..0000000000000 --- a/dashboard/components/BackPressureTable.tsx +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { - Table, - TableCaption, - TableContainer, - Tbody, - Td, - Th, - Thead, - Tr, -} from "@chakra-ui/react" -import { sortBy } from "lodash" -import Head from "next/head" -import { Fragment, useEffect, useState } from "react" -import useErrorToast from "../hook/useErrorToast" -import { - BackPressuresMetrics, - getActorBackPressures, -} from "../pages/api/metric" -import RateBar from "./RateBar" - -export default function BackPressureTable({ - selectedFragmentIds, -}: { - selectedFragmentIds: Set -}) { - const [backPressuresMetrics, setBackPressuresMetrics] = - useState() - const toast = useErrorToast() - - useEffect(() => { - async function doFetch() { - while (true) { - try { - let metrics = await getActorBackPressures() - metrics.outputBufferBlockingDuration = sortBy( - metrics.outputBufferBlockingDuration, - (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) - ) - setBackPressuresMetrics(metrics) - await new Promise((resolve) => setTimeout(resolve, 5000)) // refresh every 5 secs - } catch (e: any) { - toast(e, "warning") - break - } - } - } - doFetch() - return () => {} - }, [toast]) - - const isSelected = (fragmentId: string) => selectedFragmentIds.has(fragmentId) - - const retVal = ( - - - Back Pressures (Last 30 minutes) - - - - - - - - {backPressuresMetrics && - backPressuresMetrics.outputBufferBlockingDuration - .filter((m) => isSelected(m.metric.fragment_id)) - .map((m) => ( - - - - - ))} - -
Fragment IDs → DownstreamBlock Rate
{`Fragment ${m.metric.fragment_id} -> ${m.metric.downstream_fragment_id}`} - -
-
- ) - return ( - - - Streaming Back Pressure - - {retVal} - - ) -} diff --git a/dashboard/components/FragmentGraph.tsx b/dashboard/components/FragmentGraph.tsx index 72184d1b2a8bc..6ca2c14dda28e 100644 --- a/dashboard/components/FragmentGraph.tsx +++ b/dashboard/components/FragmentGraph.tsx @@ -479,7 +479,6 @@ export default function FragmentGraph({ - {/* */} ) } diff --git a/dashboard/pages/api/metric.ts b/dashboard/pages/api/metric.ts index 2ea62cebc4385..0eae481624b13 100644 --- a/dashboard/pages/api/metric.ts +++ b/dashboard/pages/api/metric.ts @@ -17,10 +17,12 @@ import { Metrics, MetricsSample } from "../../components/metrics" import api from "./api" +export const INTERVAL = 5000 export interface BackPressuresMetrics { outputBufferBlockingDuration: Metrics[] } +// Get back pressure from meta node -> prometheus export async function getActorBackPressures() { const res: BackPressuresMetrics = await api.get( "/metrics/actor/back_pressures" @@ -28,6 +30,134 @@ export async function getActorBackPressures() { return res } +export interface BackPressureInfo { + actorId: number + fragmentId: number + downstreamFragmentId: number + value: number +} + +export interface BackPressureRateInfo { + actorId: number + fragmentId: number + downstreamFragmentId: number + backPressureRate: number +} + +function convertToMapAndAgg( + backPressures: BackPressureInfo[] +): Map { + // FragmentId-downstreamFragmentId, total value + const mapValue = new Map() + // FragmentId-downstreamFragmentId, total count + const mapNumber = new Map() + // FragmentId-downstreamFragmentId, average value + const map = new Map() + for (const item of backPressures) { + const key = `${item.fragmentId}-${item.downstreamFragmentId}` + if (mapValue.has(key) && mapNumber.has(key)) { + // add || tp avoid NaN and pass check + mapValue.set(key, (mapValue.get(key) || 0) + item.value) + mapNumber.set(key, (mapNumber.get(key) || 0) + 1) + } else { + mapValue.set(key, item.value) + mapNumber.set(key, 1) + } + } + + for (const [key, value] of mapValue) { + map.set(key, value / mapNumber.get(key)!) + } + return map +} + +function convertFromMapAndAgg( + map: Map +): BackPressureRateInfo[] { + const result: BackPressureRateInfo[] = [] + map.forEach((value, key) => { + const [fragmentId, downstreamFragmentId] = key.split("-").map(Number) + const backPressureRateInfo: BackPressureRateInfo = { + actorId: 0, + fragmentId, + downstreamFragmentId, + backPressureRate: value, + } + result.push(backPressureRateInfo) + }) + return result +} + +function convertToBackPressureMetrics( + bpRates: BackPressureRateInfo[] +): BackPressuresMetrics { + const bpMetrics: BackPressuresMetrics = { + outputBufferBlockingDuration: [], + } + for (const item of bpRates) { + bpMetrics.outputBufferBlockingDuration.push({ + metric: { + actorId: item.actorId.toString(), + fragmentId: item.fragmentId.toString(), + downstreamFragmentId: item.downstreamFragmentId.toString(), + }, + sample: [ + { + timestamp: Date.now(), + value: item.backPressureRate, + }, + ], + }) + } + return bpMetrics +} + +export function calculateBPRate( + backPressureNew: BackPressureInfo[], + backPressureOld: BackPressureInfo[] +): BackPressuresMetrics { + let mapNew = convertToMapAndAgg(backPressureNew) + let mapOld = convertToMapAndAgg(backPressureOld) + let result = new Map() + mapNew.forEach((value, key) => { + if (mapOld.has(key)) { + result.set( + key, + // The *100 in end of the formular is to convert the BP rate to the value used in web UI drawing + ((value - (mapOld.get(key) || 0)) / ((INTERVAL / 1000) * 1000000000)) * + 100 + ) + } else { + result.set(key, 0) + } + }) + + return convertToBackPressureMetrics(convertFromMapAndAgg(result)) +} + +export const BackPressureInfo = { + fromJSON: (object: any) => { + return { + actorId: isSet(object.actorId) ? Number(object.actorId) : 0, + fragmentId: isSet(object.fragmentId) ? Number(object.fragmentId) : 0, + downstreamFragmentId: isSet(object.downstreamFragmentId) + ? Number(object.downstreamFragmentId) + : 0, + value: isSet(object.value) ? Number(object.value) : 0, + } + }, +} + +// Get back pressure from meta node -> compute node +export async function getBackPressureWithoutPrometheus() { + const response = await api.get("/metrics/back_pressures") + let backPressureInfos: BackPressureInfo[] = response.backPressureInfos.map( + BackPressureInfo.fromJSON + ) + backPressureInfos = backPressureInfos.sort((a, b) => a.actorId - b.actorId) + return backPressureInfos +} + function calculatePercentile(samples: MetricsSample[], percentile: number) { const sorted = samples.sort((a, b) => a.value - b.value) const index = Math.floor(sorted.length * percentile) @@ -49,3 +179,7 @@ export function p95(samples: MetricsSample[]) { export function p99(samples: MetricsSample[]) { return calculatePercentile(samples, 0.99) } + +function isSet(value: any): boolean { + return value !== null && value !== undefined +} diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 244b78a540a0a..8d1330cd69828 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -29,7 +29,7 @@ import { } from "@chakra-ui/react" import * as d3 from "d3" import { dagStratify } from "d3-dag" -import _ from "lodash" +import _, { sortBy } from "lodash" import Head from "next/head" import { parseAsInteger, useQueryState } from "nuqs" import { Fragment, useCallback, useEffect, useMemo, useState } from "react" @@ -41,7 +41,18 @@ import { FragmentBox } from "../lib/layout" import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta" import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan" import useFetch from "./api/fetch" -import { getActorBackPressures, p50, p90, p95, p99 } from "./api/metric" +import { + BackPressureInfo, + BackPressuresMetrics, + INTERVAL, + calculateBPRate, + getActorBackPressures, + getBackPressureWithoutPrometheus, + p50, + p90, + p95, + p99, +} from "./api/metric" import { getFragments, getStreamingJobs } from "./api/streaming" interface DispatcherNode { @@ -171,6 +182,12 @@ const SIDEBAR_WIDTH = 200 type BackPressureAlgo = "p50" | "p90" | "p95" | "p99" const backPressureAlgos: BackPressureAlgo[] = ["p50", "p90", "p95", "p99"] +type BackPressureDataSource = "Embedded" | "Prometheus" +const backPressureDataSources: BackPressureDataSource[] = [ + "Embedded", + "Prometheus", +] + export default function Streaming() { const { response: relationList } = useFetch(getStreamingJobs) const { response: fragmentList } = useFetch(getFragments) @@ -178,11 +195,14 @@ export default function Streaming() { const [relationId, setRelationId] = useQueryState("id", parseAsInteger) const [backPressureAlgo, setBackPressureAlgo] = useQueryState("backPressure") const [selectedFragmentId, setSelectedFragmentId] = useState() + // used to get the data source + const [backPressureDataSourceAlgo, setBackPressureDataSourceAlgo] = + useState("Embedded") const { response: actorBackPressures } = useFetch( getActorBackPressures, - 5000, - backPressureAlgo !== null + INTERVAL, + backPressureDataSourceAlgo === "Prometheus" && backPressureAlgo !== null ) const fragmentDependencyCallback = useCallback(() => { @@ -213,6 +233,39 @@ export default function Streaming() { return () => {} }, [relationId, relationList, setRelationId]) + // get back pressure rate without prometheus + const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] = + useState() + const [previousBP, setPreviousBP] = useState([]) + const [currentBP, setCurrentBP] = useState([]) + const toast = useErrorToast() + + useEffect(() => { + if (backPressureDataSourceAlgo === "Embedded") { + const interval = setInterval(() => { + const fetchNewBP = async () => { + const newBP = await getBackPressureWithoutPrometheus() + setPreviousBP(currentBP) + setCurrentBP(newBP) + } + + fetchNewBP().catch(console.error) + }, INTERVAL) + return () => clearInterval(interval) + } + }, [currentBP, backPressureDataSourceAlgo]) + + useEffect(() => { + if (currentBP !== null && previousBP !== null) { + const metrics = calculateBPRate(currentBP, previousBP) + metrics.outputBufferBlockingDuration = sortBy( + metrics.outputBufferBlockingDuration, + (m) => (m.metric.fragmentId, m.metric.downstreamFragmentId) + ) + setBackPressuresMetrics(metrics) + } + }, [currentBP, previousBP]) + const fragmentDependency = fragmentDependencyCallback()?.fragmentDep const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag const fragments = fragmentDependencyCallback()?.fragments @@ -239,8 +292,6 @@ export default function Streaming() { const [searchActorId, setSearchActorId] = useState("") const [searchFragId, setSearchFragId] = useState("") - const toast = useErrorToast() - const handleSearchFragment = () => { const searchFragIdInt = parseInt(searchFragId) if (fragmentList) { @@ -279,38 +330,60 @@ export default function Streaming() { } const backPressures = useMemo(() => { - if (actorBackPressures && backPressureAlgo) { + if ( + (actorBackPressures && backPressureAlgo && backPressureAlgo) || + backPressuresMetricsWithoutPromtheus + ) { let map = new Map() - for (const m of actorBackPressures.outputBufferBlockingDuration) { - console.log(backPressureAlgo) - let algoFunc - switch (backPressureAlgo) { - case "p50": - algoFunc = p50 - break - case "p90": - algoFunc = p90 - break - case "p95": - algoFunc = p95 - break - case "p99": - algoFunc = p99 - break - default: - return + if ( + backPressureDataSourceAlgo === "Embedded" && + backPressuresMetricsWithoutPromtheus + ) { + for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) { + map.set( + `${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`, + m.sample[0].value + ) } + } else if ( + backPressureDataSourceAlgo !== "Embedded" && + actorBackPressures + ) { + for (const m of actorBackPressures.outputBufferBlockingDuration) { + let algoFunc + switch (backPressureAlgo) { + case "p50": + algoFunc = p50 + break + case "p90": + algoFunc = p90 + break + case "p95": + algoFunc = p95 + break + case "p99": + algoFunc = p99 + break + default: + return + } - const value = algoFunc(m.sample) * 100 - map.set( - `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, - value - ) + const value = algoFunc(m.sample) * 100 + map.set( + `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, + value + ) + } } return map } - }, [actorBackPressures, backPressureAlgo]) + }, [ + backPressureDataSourceAlgo, + actorBackPressures, + backPressureAlgo, + backPressuresMetricsWithoutPromtheus, + ]) const retVal = ( @@ -381,25 +454,40 @@ export default function Streaming() { - Back Pressure + Back Pressure Data Source + {backPressureDataSourceAlgo === "Prometheus" && ( + + Back Pressure Algorithm + + + )} Fragments {fragmentDependencyDag && (