From bd213e569a8bec405c823badb999afacabf5dd3c Mon Sep 17 00:00:00 2001 From: yufansong Date: Thu, 25 Jan 2024 20:02:47 -0800 Subject: [PATCH 01/14] add frontend test code --- dashboard/components/Layout.tsx | 1 + dashboard/mock-server.js | 4 +++ dashboard/mock/fetch.sh | 1 + dashboard/pages/api/metric.ts | 42 +++++++++++++++++++++++++ dashboard/pages/back_pressure_rates.tsx | 24 ++++++++++++++ 5 files changed, 72 insertions(+) create mode 100644 dashboard/pages/back_pressure_rates.tsx diff --git a/dashboard/components/Layout.tsx b/dashboard/components/Layout.tsx index 7be9f727bddec..a1aacc3ca8995 100644 --- a/dashboard/components/Layout.tsx +++ b/dashboard/components/Layout.tsx @@ -145,6 +145,7 @@ function Layout({ children }: { children: React.ReactNode }) { Streaming Dependency Graph Fragment Graph + Back Pressure Rates
Batch diff --git a/dashboard/mock-server.js b/dashboard/mock-server.js index 31d67bd87b772..11365f33ddc07 100644 --- a/dashboard/mock-server.js +++ b/dashboard/mock-server.js @@ -80,3 +80,7 @@ app.get("/metrics/actor/back_pressures", (req, res, next) => { app.get("/monitor/await_tree/1", (req, res, next) => { res.json(require("./mock/await_tree_1.json")) }) + +app.get("/metrics/back_pressures", (req, res, next) => { + res.json(require("./mock/back_pressures.json")) +}) diff --git a/dashboard/mock/fetch.sh b/dashboard/mock/fetch.sh index 5cc278e01d0c5..baf86c3dc4ab7 100755 --- a/dashboard/mock/fetch.sh +++ b/dashboard/mock/fetch.sh @@ -18,4 +18,5 @@ curl http://localhost:5691/api/sinks > sinks.json curl http://localhost:5691/api/sources > sources.json curl http://localhost:5691/api/metrics/cluster > metrics_cluster.json curl http://localhost:5691/api/metrics/actor/back_pressures > actor_back_pressures.json +curl http://localhost:5691/api/metrics/back_pressures > back_pressures.json curl http://localhost:5691/api/monitor/await_tree/1 > await_tree_1.json diff --git a/dashboard/pages/api/metric.ts b/dashboard/pages/api/metric.ts index 2ea62cebc4385..2423361b8006f 100644 --- a/dashboard/pages/api/metric.ts +++ b/dashboard/pages/api/metric.ts @@ -15,12 +15,14 @@ * */ import { Metrics, MetricsSample } from "../../components/metrics" +import { Field } from "../../proto/gen/plan_common" import api from "./api" export interface BackPressuresMetrics { outputBufferBlockingDuration: Metrics[] } +// Get back pressure from meta node -> prometheus export async function getActorBackPressures() { const res: BackPressuresMetrics = await api.get( "/metrics/actor/back_pressures" @@ -28,6 +30,42 @@ export async function getActorBackPressures() { return res } +export interface BackPressureInfo { + id: number; + name: string; + owner: number; + columns: Field[]; + actorId: number, + fragementId: number, + donwStreamFragmentId: number, + value: number, +} + +export const BackPressureInfo = { + fromJSON: (object: any) => { + return { + id: 0, + name: "", + owner: 0, + columns: [], + actorId: isSet(object.actorId) ? Number(object.actorId) : 0, + fragementId: isSet(object.fragementId) ? Number(object.fragementId) : 0, + donwStreamFragmentId: isSet(object.donwStreamFragmentId) ? Number(object.donwStreamFragmentId) : 0, + value: isSet(object.value) ? Number(object.value) : 0, + } + }, +} + +// Get back pressure from meta node -> compute node +export async function getComputeBackPressures() { + const response = await api.get("/metrics/back_pressures"); + + let back_pressure_infos: BackPressureInfo[] = response.backPressureInfos.map(BackPressureInfo.fromJSON) + + back_pressure_infos = back_pressure_infos.sort((a, b) => a.actorId - b.actorId) + return back_pressure_infos +} + function calculatePercentile(samples: MetricsSample[], percentile: number) { const sorted = samples.sort((a, b) => a.value - b.value) const index = Math.floor(sorted.length * percentile) @@ -49,3 +87,7 @@ export function p95(samples: MetricsSample[]) { export function p99(samples: MetricsSample[]) { return calculatePercentile(samples, 0.99) } + +function isSet(value: any): boolean { + return value !== null && value !== undefined; +} \ No newline at end of file diff --git a/dashboard/pages/back_pressure_rates.tsx b/dashboard/pages/back_pressure_rates.tsx new file mode 100644 index 0000000000000..8c163bf8f106f --- /dev/null +++ b/dashboard/pages/back_pressure_rates.tsx @@ -0,0 +1,24 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +import { + Relations, +} from "../components/Relations" +import { BackPressureInfo, getComputeBackPressures } from "./api/metric" + +export default function BackPressureRates() { + return Relations("Back Pressure Rates", getComputeBackPressures, []) +} \ No newline at end of file From 9ae89d7a6a408a093b1d33eaaa4fa133ca880484 Mon Sep 17 00:00:00 2001 From: yufansong Date: Tue, 30 Jan 2024 21:11:49 -0800 Subject: [PATCH 02/14] refactor frontend code, adopt to existing streaming graph --- .../BackPressureTableWithoutPrometheus.tsx | 111 ++++++++++++++++ dashboard/components/Layout.tsx | 1 - dashboard/pages/api/metric.ts | 122 ++++++++++++++---- dashboard/pages/back_pressure_rates.tsx | 24 ---- dashboard/pages/fragment_graph.tsx | 110 +++++++++++----- 5 files changed, 287 insertions(+), 81 deletions(-) create mode 100644 dashboard/components/BackPressureTableWithoutPrometheus.tsx delete mode 100644 dashboard/pages/back_pressure_rates.tsx diff --git a/dashboard/components/BackPressureTableWithoutPrometheus.tsx b/dashboard/components/BackPressureTableWithoutPrometheus.tsx new file mode 100644 index 0000000000000..02b949a61d94a --- /dev/null +++ b/dashboard/components/BackPressureTableWithoutPrometheus.tsx @@ -0,0 +1,111 @@ +/* + * Copyright 2024 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { + Table, + TableCaption, + TableContainer, + Tbody, + Td, + Th, + Thead, + Tr, +} from "@chakra-ui/react" +import { sortBy } from "lodash" +import Head from "next/head" +import { Fragment, useEffect, useState } from "react" +import useErrorToast from "../hook/useErrorToast" +import RateBar from "./RateBar" +import { BackPressureInfo, getBackPressureWithoutPrometheus, BackPressuresMetrics, calculateBPRate, INTERVAL } from "../pages/api/metric" + +export default function BackPressureTableWithoutPrometheus({ + selectedFragmentIds, +}: { + selectedFragmentIds: Set +}) { + const [backPressuresMetrics, setBackPressuresMetrics] = + useState() + const [previousBP, setPreviousBP] = useState([]) + const toast = useErrorToast() + + useEffect(() => { + let localPreviousBP = previousBP + + async function doFetch() { + while (true) { + try { + const currentBP = await getBackPressureWithoutPrometheus() + const metrics = calculateBPRate(currentBP, localPreviousBP) + setBackPressuresMetrics(metrics) + localPreviousBP = currentBP + setPreviousBP(currentBP) + + metrics.outputBufferBlockingDuration = sortBy( + metrics.outputBufferBlockingDuration, + (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) + ) + setBackPressuresMetrics(metrics) + await new Promise((resolve) => setTimeout(resolve, INTERVAL)) // refresh every 5 secs + } catch (e: any) { + toast(e, "warning") + break + } + } + } + doFetch() + return () => { } + }, [toast]) + + const isSelected = (fragmentId: string) => selectedFragmentIds.has(fragmentId) + + const retVal = ( + + + Back Pressures (Last 30 minutes) + + + + + + + + {backPressuresMetrics && + backPressuresMetrics.outputBufferBlockingDuration + .filter((m) => isSelected(m.metric.fragment_id)) + .map((m) => ( + + + + + ))} + +
Fragment IDs → DownstreamBlock Rate
{`Fragment ${m.metric.fragment_id} -> ${m.metric.downstream_fragment_id}`} + +
+
+ ) + return ( + + + Streaming Back Pressure + + {retVal} + + ) +} diff --git a/dashboard/components/Layout.tsx b/dashboard/components/Layout.tsx index a1aacc3ca8995..7be9f727bddec 100644 --- a/dashboard/components/Layout.tsx +++ b/dashboard/components/Layout.tsx @@ -145,7 +145,6 @@ function Layout({ children }: { children: React.ReactNode }) { Streaming Dependency Graph Fragment Graph - Back Pressure Rates
Batch diff --git a/dashboard/pages/api/metric.ts b/dashboard/pages/api/metric.ts index 2423361b8006f..cc258fe2d920b 100644 --- a/dashboard/pages/api/metric.ts +++ b/dashboard/pages/api/metric.ts @@ -15,9 +15,9 @@ * */ import { Metrics, MetricsSample } from "../../components/metrics" -import { Field } from "../../proto/gen/plan_common" import api from "./api" +export const INTERVAL = 20000 export interface BackPressuresMetrics { outputBufferBlockingDuration: Metrics[] } @@ -25,44 +25,118 @@ export interface BackPressuresMetrics { // Get back pressure from meta node -> prometheus export async function getActorBackPressures() { const res: BackPressuresMetrics = await api.get( - "/metrics/actor/back_pressures" + "/metrics/actor/back_pressures", ) return res } export interface BackPressureInfo { - id: number; - name: string; - owner: number; - columns: Field[]; - actorId: number, - fragementId: number, - donwStreamFragmentId: number, - value: number, + actorId: number + fragmentId: number + downstreamFragmentId: number + value: number +} + +export interface BackPressureRateInfo { + actorId: number + fragmentId: number + downstreamFragmentId: number + backPressureRate: number +} + +function convertToMapAndAgg(back_pressures: BackPressureInfo[]): Map { + const map = new Map() + for (const item of back_pressures) { + const key = `${item.fragmentId}-${item.downstreamFragmentId}` + if (map.has(key)) { + map.set(key, map.get(key) + item.value) + } else { + map.set(key, item.value) + } + } + return map +} + +function convertFromMapAndAgg(map: Map): BackPressureRateInfo[] { + const result: BackPressureRateInfo[] = [] + map.forEach((value, key) => { + const [fragmentId, downstreamFragmentId] = key + .split("-") + .map(Number) + const backPressureRateInfo: BackPressureRateInfo = { + actorId: 0, + fragmentId, + downstreamFragmentId, + backPressureRate: value, + } + result.push(backPressureRateInfo) + }) + return result +} + +function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPressuresMetrics { + const bp_metrics: BackPressuresMetrics = { + outputBufferBlockingDuration: [], + } + for (const item of bp_rates) { + bp_metrics.outputBufferBlockingDuration.push({ + metric: { + actor_id: item.actorId.toString(), + fragment_id: item.fragmentId.toString(), + downstream_fragment_id: item.downstreamFragmentId.toString(), + }, + sample: [{ + timestamp: Date.now() + , value: item.backPressureRate + }], + }) + } + return bp_metrics +} + +export function calculateBPRate( + back_pressure_new: BackPressureInfo[], + back_pressure_old: BackPressureInfo[], +): BackPressuresMetrics { + let map_new = convertToMapAndAgg(back_pressure_new) + let map_old = convertToMapAndAgg(back_pressure_old) + let result = new Map() + map_new.forEach((value, key) => { + if (map_old.has(key)) { + result.set( + key, + (value - map_old.get(key)) / (INTERVAL * 1000000000), + ) + } else { + result.set(key, 0) + } + }) + + return convertToBackPressureMetrics(convertFromMapAndAgg(result)) } export const BackPressureInfo = { fromJSON: (object: any) => { return { - id: 0, - name: "", - owner: 0, - columns: [], actorId: isSet(object.actorId) ? Number(object.actorId) : 0, - fragementId: isSet(object.fragementId) ? Number(object.fragementId) : 0, - donwStreamFragmentId: isSet(object.donwStreamFragmentId) ? Number(object.donwStreamFragmentId) : 0, + fragmentId: isSet(object.fragmentId) ? Number(object.fragmentId) : 0, + downstreamFragmentId: isSet(object.downstreamFragmentId) + ? Number(object.downstreamFragmentId) + : 0, value: isSet(object.value) ? Number(object.value) : 0, } }, } // Get back pressure from meta node -> compute node -export async function getComputeBackPressures() { - const response = await api.get("/metrics/back_pressures"); - - let back_pressure_infos: BackPressureInfo[] = response.backPressureInfos.map(BackPressureInfo.fromJSON) - - back_pressure_infos = back_pressure_infos.sort((a, b) => a.actorId - b.actorId) +export async function getBackPressureWithoutPrometheus() { + const response = await api.get("/metrics/back_pressures") + let back_pressure_infos: BackPressureInfo[] = response.backPressureInfos.map( + BackPressureInfo.fromJSON, + ) + back_pressure_infos = back_pressure_infos.sort( + (a, b) => a.actorId - b.actorId, + ) return back_pressure_infos } @@ -89,5 +163,5 @@ export function p99(samples: MetricsSample[]) { } function isSet(value: any): boolean { - return value !== null && value !== undefined; -} \ No newline at end of file + return value !== null && value !== undefined +} diff --git a/dashboard/pages/back_pressure_rates.tsx b/dashboard/pages/back_pressure_rates.tsx deleted file mode 100644 index 8c163bf8f106f..0000000000000 --- a/dashboard/pages/back_pressure_rates.tsx +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2023 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -import { - Relations, -} from "../components/Relations" -import { BackPressureInfo, getComputeBackPressures } from "./api/metric" - -export default function BackPressureRates() { - return Relations("Back Pressure Rates", getComputeBackPressures, []) -} \ No newline at end of file diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 244b78a540a0a..54eefc0760823 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -41,8 +41,9 @@ import { FragmentBox } from "../lib/layout" import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta" import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan" import useFetch from "./api/fetch" -import { getActorBackPressures, p50, p90, p95, p99 } from "./api/metric" +import { BackPressureInfo, getBackPressureWithoutPrometheus, BackPressuresMetrics, getActorBackPressures, BackPressureRateInfo, p50, p90, p95, p99, calculateBPRate, INTERVAL } from "./api/metric" import { getFragments, getStreamingJobs } from "./api/streaming" +import { sortBy } from "lodash" interface DispatcherNode { [actorId: number]: Dispatcher[] @@ -168,8 +169,8 @@ function buildFragmentDependencyAsEdges( const SIDEBAR_WIDTH = 200 -type BackPressureAlgo = "p50" | "p90" | "p95" | "p99" -const backPressureAlgos: BackPressureAlgo[] = ["p50", "p90", "p95", "p99"] +type BackPressureAlgo = "p50" | "p90" | "p95" | "p99" | "current" +const backPressureAlgos: BackPressureAlgo[] = ["p50", "p90", "p95", "p99", "current"] export default function Streaming() { const { response: relationList } = useFetch(getStreamingJobs) @@ -181,7 +182,7 @@ export default function Streaming() { const { response: actorBackPressures } = useFetch( getActorBackPressures, - 5000, + INTERVAL, backPressureAlgo !== null ) @@ -210,9 +211,47 @@ export default function Streaming() { } } } - return () => {} + return () => { } }, [relationId, relationList, setRelationId]) + + // get back pressure rate without prometheus + const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] = + useState() + const [previousBP, setPreviousBP] = useState([]) + const [backPressureRate, setBackPressureRate] = useState< + BackPressureRateInfo[] + >([]) + const toast = useErrorToast() + + useEffect(() => { + let localPreviousBP = previousBP + + async function doFetch() { + while (true) { + try { + const currentBP = await getBackPressureWithoutPrometheus() + const metrics = calculateBPRate(currentBP, localPreviousBP) + setBackPressureRate(backPressureRate) + localPreviousBP = currentBP + setPreviousBP(currentBP) + metrics.outputBufferBlockingDuration = sortBy( + metrics.outputBufferBlockingDuration, + (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) + ) + + setBackPressuresMetrics(metrics) + await new Promise((resolve) => setTimeout(resolve, INTERVAL)) + } catch (e: any) { + toast(e, "warning") + break + } + } + } + doFetch() + return () => { } + }, [toast]) + const fragmentDependency = fragmentDependencyCallback()?.fragmentDep const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag const fragments = fragmentDependencyCallback()?.fragments @@ -239,8 +278,6 @@ export default function Streaming() { const [searchActorId, setSearchActorId] = useState("") const [searchFragId, setSearchFragId] = useState("") - const toast = useErrorToast() - const handleSearchFragment = () => { const searchFragIdInt = parseInt(searchFragId) if (fragmentList) { @@ -279,38 +316,47 @@ export default function Streaming() { } const backPressures = useMemo(() => { - if (actorBackPressures && backPressureAlgo) { + if ((actorBackPressures || backPressuresMetricsWithoutPromtheus) && backPressureAlgo) { let map = new Map() - for (const m of actorBackPressures.outputBufferBlockingDuration) { - console.log(backPressureAlgo) - let algoFunc - switch (backPressureAlgo) { - case "p50": - algoFunc = p50 - break - case "p90": - algoFunc = p90 - break - case "p95": - algoFunc = p95 - break - case "p99": - algoFunc = p99 - break - default: - return + if (backPressureAlgo === "current" && backPressuresMetricsWithoutPromtheus) { + for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) { + map.set( + `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, + m.sample[0].value + ) } + } else if (backPressureAlgo !== "current" && actorBackPressures) { + for (const m of actorBackPressures.outputBufferBlockingDuration) { + console.log(backPressureAlgo) + let algoFunc + switch (backPressureAlgo) { + case "p50": + algoFunc = p50 + break + case "p90": + algoFunc = p90 + break + case "p95": + algoFunc = p95 + break + case "p99": + algoFunc = p99 + break + default: + return + } - const value = algoFunc(m.sample) * 100 - map.set( - `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, - value - ) + const value = algoFunc(m.sample) * 100 + map.set( + `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, + value + ) + } } return map } - }, [actorBackPressures, backPressureAlgo]) + }, [actorBackPressures, backPressureAlgo, backPressuresMetricsWithoutPromtheus]) const retVal = ( From 62c0beaba68698b51b1e8097ee0ad5d1e8ded59c Mon Sep 17 00:00:00 2001 From: yufansong Date: Fri, 2 Feb 2024 01:46:51 -0800 Subject: [PATCH 03/14] apply suggestions --- dashboard/components/BackPressureTable.tsx | 106 ------------ .../BackPressureTableWithoutPrometheus.tsx | 111 ------------- dashboard/components/FragmentGraph.tsx | 1 - dashboard/mock-server.js | 4 - dashboard/mock/fetch.sh | 1 - dashboard/pages/api/metric.ts | 56 ++++--- dashboard/pages/fragment_graph.tsx | 151 ++++++++++++------ 7 files changed, 141 insertions(+), 289 deletions(-) delete mode 100644 dashboard/components/BackPressureTable.tsx delete mode 100644 dashboard/components/BackPressureTableWithoutPrometheus.tsx diff --git a/dashboard/components/BackPressureTable.tsx b/dashboard/components/BackPressureTable.tsx deleted file mode 100644 index ad790c0cb680a..0000000000000 --- a/dashboard/components/BackPressureTable.tsx +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { - Table, - TableCaption, - TableContainer, - Tbody, - Td, - Th, - Thead, - Tr, -} from "@chakra-ui/react" -import { sortBy } from "lodash" -import Head from "next/head" -import { Fragment, useEffect, useState } from "react" -import useErrorToast from "../hook/useErrorToast" -import { - BackPressuresMetrics, - getActorBackPressures, -} from "../pages/api/metric" -import RateBar from "./RateBar" - -export default function BackPressureTable({ - selectedFragmentIds, -}: { - selectedFragmentIds: Set -}) { - const [backPressuresMetrics, setBackPressuresMetrics] = - useState() - const toast = useErrorToast() - - useEffect(() => { - async function doFetch() { - while (true) { - try { - let metrics = await getActorBackPressures() - metrics.outputBufferBlockingDuration = sortBy( - metrics.outputBufferBlockingDuration, - (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) - ) - setBackPressuresMetrics(metrics) - await new Promise((resolve) => setTimeout(resolve, 5000)) // refresh every 5 secs - } catch (e: any) { - toast(e, "warning") - break - } - } - } - doFetch() - return () => {} - }, [toast]) - - const isSelected = (fragmentId: string) => selectedFragmentIds.has(fragmentId) - - const retVal = ( - - - Back Pressures (Last 30 minutes) - - - - - - - - {backPressuresMetrics && - backPressuresMetrics.outputBufferBlockingDuration - .filter((m) => isSelected(m.metric.fragment_id)) - .map((m) => ( - - - - - ))} - -
Fragment IDs → DownstreamBlock Rate
{`Fragment ${m.metric.fragment_id} -> ${m.metric.downstream_fragment_id}`} - -
-
- ) - return ( - - - Streaming Back Pressure - - {retVal} - - ) -} diff --git a/dashboard/components/BackPressureTableWithoutPrometheus.tsx b/dashboard/components/BackPressureTableWithoutPrometheus.tsx deleted file mode 100644 index 02b949a61d94a..0000000000000 --- a/dashboard/components/BackPressureTableWithoutPrometheus.tsx +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { - Table, - TableCaption, - TableContainer, - Tbody, - Td, - Th, - Thead, - Tr, -} from "@chakra-ui/react" -import { sortBy } from "lodash" -import Head from "next/head" -import { Fragment, useEffect, useState } from "react" -import useErrorToast from "../hook/useErrorToast" -import RateBar from "./RateBar" -import { BackPressureInfo, getBackPressureWithoutPrometheus, BackPressuresMetrics, calculateBPRate, INTERVAL } from "../pages/api/metric" - -export default function BackPressureTableWithoutPrometheus({ - selectedFragmentIds, -}: { - selectedFragmentIds: Set -}) { - const [backPressuresMetrics, setBackPressuresMetrics] = - useState() - const [previousBP, setPreviousBP] = useState([]) - const toast = useErrorToast() - - useEffect(() => { - let localPreviousBP = previousBP - - async function doFetch() { - while (true) { - try { - const currentBP = await getBackPressureWithoutPrometheus() - const metrics = calculateBPRate(currentBP, localPreviousBP) - setBackPressuresMetrics(metrics) - localPreviousBP = currentBP - setPreviousBP(currentBP) - - metrics.outputBufferBlockingDuration = sortBy( - metrics.outputBufferBlockingDuration, - (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) - ) - setBackPressuresMetrics(metrics) - await new Promise((resolve) => setTimeout(resolve, INTERVAL)) // refresh every 5 secs - } catch (e: any) { - toast(e, "warning") - break - } - } - } - doFetch() - return () => { } - }, [toast]) - - const isSelected = (fragmentId: string) => selectedFragmentIds.has(fragmentId) - - const retVal = ( - - - Back Pressures (Last 30 minutes) - - - - - - - - {backPressuresMetrics && - backPressuresMetrics.outputBufferBlockingDuration - .filter((m) => isSelected(m.metric.fragment_id)) - .map((m) => ( - - - - - ))} - -
Fragment IDs → DownstreamBlock Rate
{`Fragment ${m.metric.fragment_id} -> ${m.metric.downstream_fragment_id}`} - -
-
- ) - return ( - - - Streaming Back Pressure - - {retVal} - - ) -} diff --git a/dashboard/components/FragmentGraph.tsx b/dashboard/components/FragmentGraph.tsx index 72184d1b2a8bc..6ca2c14dda28e 100644 --- a/dashboard/components/FragmentGraph.tsx +++ b/dashboard/components/FragmentGraph.tsx @@ -479,7 +479,6 @@ export default function FragmentGraph({ - {/* */} ) } diff --git a/dashboard/mock-server.js b/dashboard/mock-server.js index 11365f33ddc07..31d67bd87b772 100644 --- a/dashboard/mock-server.js +++ b/dashboard/mock-server.js @@ -80,7 +80,3 @@ app.get("/metrics/actor/back_pressures", (req, res, next) => { app.get("/monitor/await_tree/1", (req, res, next) => { res.json(require("./mock/await_tree_1.json")) }) - -app.get("/metrics/back_pressures", (req, res, next) => { - res.json(require("./mock/back_pressures.json")) -}) diff --git a/dashboard/mock/fetch.sh b/dashboard/mock/fetch.sh index baf86c3dc4ab7..5cc278e01d0c5 100755 --- a/dashboard/mock/fetch.sh +++ b/dashboard/mock/fetch.sh @@ -18,5 +18,4 @@ curl http://localhost:5691/api/sinks > sinks.json curl http://localhost:5691/api/sources > sources.json curl http://localhost:5691/api/metrics/cluster > metrics_cluster.json curl http://localhost:5691/api/metrics/actor/back_pressures > actor_back_pressures.json -curl http://localhost:5691/api/metrics/back_pressures > back_pressures.json curl http://localhost:5691/api/monitor/await_tree/1 > await_tree_1.json diff --git a/dashboard/pages/api/metric.ts b/dashboard/pages/api/metric.ts index cc258fe2d920b..7fad567699cf8 100644 --- a/dashboard/pages/api/metric.ts +++ b/dashboard/pages/api/metric.ts @@ -17,15 +17,16 @@ import { Metrics, MetricsSample } from "../../components/metrics" import api from "./api" -export const INTERVAL = 20000 +export const INTERVAL = 5000 export interface BackPressuresMetrics { outputBufferBlockingDuration: Metrics[] } // Get back pressure from meta node -> prometheus export async function getActorBackPressures() { + console.log("send api") const res: BackPressuresMetrics = await api.get( - "/metrics/actor/back_pressures", + "/metrics/actor/back_pressures" ) return res } @@ -44,25 +45,38 @@ export interface BackPressureRateInfo { backPressureRate: number } -function convertToMapAndAgg(back_pressures: BackPressureInfo[]): Map { +function convertToMapAndAgg( + back_pressures: BackPressureInfo[] +): Map { + // fragementId-downstreamFragementId, total value + const map_value = new Map() + // fragementId-downstreamFragementId, total count + const map_number = new Map() + // fragementId-downstreamFragementId, average value const map = new Map() for (const item of back_pressures) { const key = `${item.fragmentId}-${item.downstreamFragmentId}` - if (map.has(key)) { - map.set(key, map.get(key) + item.value) + if (map_value.has(key)) { + map_value.set(key, map_value.get(key) + item.value) + map_number.set(key, map_number.get(key) + 1) } else { - map.set(key, item.value) + map_value.set(key, item.value) + map_number.set(key, 1) } } + + for (const [key, value] of map_value) { + map.set(key, value / map_number.get(key)) + } return map } -function convertFromMapAndAgg(map: Map): BackPressureRateInfo[] { +function convertFromMapAndAgg( + map: Map +): BackPressureRateInfo[] { const result: BackPressureRateInfo[] = [] map.forEach((value, key) => { - const [fragmentId, downstreamFragmentId] = key - .split("-") - .map(Number) + const [fragmentId, downstreamFragmentId] = key.split("-").map(Number) const backPressureRateInfo: BackPressureRateInfo = { actorId: 0, fragmentId, @@ -74,7 +88,9 @@ function convertFromMapAndAgg(map: Map): BackPressureRateInfo[] return result } -function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPressuresMetrics { +function convertToBackPressureMetrics( + bp_rates: BackPressureRateInfo[] +): BackPressuresMetrics { const bp_metrics: BackPressuresMetrics = { outputBufferBlockingDuration: [], } @@ -85,10 +101,12 @@ function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPre fragment_id: item.fragmentId.toString(), downstream_fragment_id: item.downstreamFragmentId.toString(), }, - sample: [{ - timestamp: Date.now() - , value: item.backPressureRate - }], + sample: [ + { + timestamp: Date.now(), + value: item.backPressureRate, + }, + ], }) } return bp_metrics @@ -96,7 +114,7 @@ function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPre export function calculateBPRate( back_pressure_new: BackPressureInfo[], - back_pressure_old: BackPressureInfo[], + back_pressure_old: BackPressureInfo[] ): BackPressuresMetrics { let map_new = convertToMapAndAgg(back_pressure_new) let map_old = convertToMapAndAgg(back_pressure_old) @@ -105,7 +123,7 @@ export function calculateBPRate( if (map_old.has(key)) { result.set( key, - (value - map_old.get(key)) / (INTERVAL * 1000000000), + (value - map_old.get(key)) / ((INTERVAL / 1000) * 1000000000) ) } else { result.set(key, 0) @@ -132,10 +150,10 @@ export const BackPressureInfo = { export async function getBackPressureWithoutPrometheus() { const response = await api.get("/metrics/back_pressures") let back_pressure_infos: BackPressureInfo[] = response.backPressureInfos.map( - BackPressureInfo.fromJSON, + BackPressureInfo.fromJSON ) back_pressure_infos = back_pressure_infos.sort( - (a, b) => a.actorId - b.actorId, + (a, b) => a.actorId - b.actorId ) return back_pressure_infos } diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 54eefc0760823..59a9db549ce9e 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -29,7 +29,7 @@ import { } from "@chakra-ui/react" import * as d3 from "d3" import { dagStratify } from "d3-dag" -import _ from "lodash" +import _, { sortBy } from "lodash" import Head from "next/head" import { parseAsInteger, useQueryState } from "nuqs" import { Fragment, useCallback, useEffect, useMemo, useState } from "react" @@ -41,9 +41,19 @@ import { FragmentBox } from "../lib/layout" import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta" import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan" import useFetch from "./api/fetch" -import { BackPressureInfo, getBackPressureWithoutPrometheus, BackPressuresMetrics, getActorBackPressures, BackPressureRateInfo, p50, p90, p95, p99, calculateBPRate, INTERVAL } from "./api/metric" +import { + BackPressureInfo, + BackPressuresMetrics, + INTERVAL, + calculateBPRate, + getActorBackPressures, + getBackPressureWithoutPrometheus, + p50, + p90, + p95, + p99, +} from "./api/metric" import { getFragments, getStreamingJobs } from "./api/streaming" -import { sortBy } from "lodash" interface DispatcherNode { [actorId: number]: Dispatcher[] @@ -169,8 +179,20 @@ function buildFragmentDependencyAsEdges( const SIDEBAR_WIDTH = 200 -type BackPressureAlgo = "p50" | "p90" | "p95" | "p99" | "current" -const backPressureAlgos: BackPressureAlgo[] = ["p50", "p90", "p95", "p99", "current"] +type BackPressureAlgo = "disable" | "p50" | "p90" | "p95" | "p99" +const backPressureAlgos: BackPressureAlgo[] = [ + "disable", + "p50", + "p90", + "p95", + "p99", +] + +type BackPressureDataSourceAlgo = "Embedded" | "Prometheus" +const backPressureDataSourceAlgos: BackPressureDataSourceAlgo[] = [ + "Embedded", + "Prometheus", +] export default function Streaming() { const { response: relationList } = useFetch(getStreamingJobs) @@ -179,6 +201,13 @@ export default function Streaming() { const [relationId, setRelationId] = useQueryState("id", parseAsInteger) const [backPressureAlgo, setBackPressureAlgo] = useQueryState("backPressure") const [selectedFragmentId, setSelectedFragmentId] = useState() + // used to get the data source + const [backPressureDataSourceAlgo, setBackPressureDataSourceAlgo] = + useState("Embedded") + // used to set the back pressure choice list + const [backPressureAlgoOptions, setBackPressureAlgoOptions] = useState( + [] as BackPressureAlgo[] + ) const { response: actorBackPressures } = useFetch( getActorBackPressures, @@ -211,46 +240,48 @@ export default function Streaming() { } } } - return () => { } + return () => {} }, [relationId, relationList, setRelationId]) + useEffect(() => { + if (backPressureDataSourceAlgo === "Prometheus") { + setBackPressureAlgoOptions(backPressureAlgos) + } else { + setBackPressureAlgoOptions([]) + } + }, [backPressureDataSourceAlgo]) // get back pressure rate without prometheus const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] = useState() const [previousBP, setPreviousBP] = useState([]) - const [backPressureRate, setBackPressureRate] = useState< - BackPressureRateInfo[] - >([]) + const [currentBP, setCurrentBP] = useState([]) const toast = useErrorToast() useEffect(() => { - let localPreviousBP = previousBP - - async function doFetch() { - while (true) { - try { - const currentBP = await getBackPressureWithoutPrometheus() - const metrics = calculateBPRate(currentBP, localPreviousBP) - setBackPressureRate(backPressureRate) - localPreviousBP = currentBP - setPreviousBP(currentBP) - metrics.outputBufferBlockingDuration = sortBy( - metrics.outputBufferBlockingDuration, - (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) - ) - - setBackPressuresMetrics(metrics) - await new Promise((resolve) => setTimeout(resolve, INTERVAL)) - } catch (e: any) { - toast(e, "warning") - break - } + const interval = setInterval(() => { + const fetchNewBP = async () => { + const newBP = await getBackPressureWithoutPrometheus() + setPreviousBP(currentBP) + setCurrentBP(newBP) } + + fetchNewBP().catch(console.error) + }, INTERVAL) + + return () => clearInterval(interval) + }, [currentBP]) + + useEffect(() => { + if (currentBP !== null && previousBP !== null) { + const metrics = calculateBPRate(currentBP, previousBP) + metrics.outputBufferBlockingDuration = sortBy( + metrics.outputBufferBlockingDuration, + (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) + ) + setBackPressuresMetrics(metrics) } - doFetch() - return () => { } - }, [toast]) + }, [currentBP, previousBP]) const fragmentDependency = fragmentDependencyCallback()?.fragmentDep const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag @@ -316,19 +347,29 @@ export default function Streaming() { } const backPressures = useMemo(() => { - if ((actorBackPressures || backPressuresMetricsWithoutPromtheus) && backPressureAlgo) { + if ( + (actorBackPressures && + backPressureAlgo && + backPressureAlgo !== "disable") || + backPressuresMetricsWithoutPromtheus + ) { let map = new Map() - if (backPressureAlgo === "current" && backPressuresMetricsWithoutPromtheus) { + if ( + backPressureDataSourceAlgo === "Embedded" && + backPressuresMetricsWithoutPromtheus + ) { for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) { map.set( `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, m.sample[0].value ) } - } else if (backPressureAlgo !== "current" && actorBackPressures) { + } else if ( + backPressureDataSourceAlgo !== "Embedded" && + actorBackPressures + ) { for (const m of actorBackPressures.outputBufferBlockingDuration) { - console.log(backPressureAlgo) let algoFunc switch (backPressureAlgo) { case "p50": @@ -356,7 +397,12 @@ export default function Streaming() { } return map } - }, [actorBackPressures, backPressureAlgo, backPressuresMetricsWithoutPromtheus]) + }, [ + backPressureDataSourceAlgo, + actorBackPressures, + backPressureAlgo, + backPressuresMetricsWithoutPromtheus, + ]) const retVal = ( @@ -427,19 +473,30 @@ export default function Streaming() { - Back Pressure + Data Source + + + Back Pressure + From c6b3073e882b2700969dad1902617b11feeed8fc Mon Sep 17 00:00:00 2001 From: Yufan Song <33971064+yufansong@users.noreply.github.com> Date: Mon, 5 Feb 2024 15:35:57 -0800 Subject: [PATCH 06/14] Update dashboard/pages/fragment_graph.tsx Co-authored-by: Bugen Zhao --- dashboard/pages/fragment_graph.tsx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 0ab8b4ed2a4bf..488756acb00b7 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -188,8 +188,8 @@ const backPressureAlgos: BackPressureAlgo[] = [ "p99", ] -type BackPressureDataSourceAlgo = "Embedded" | "Prometheus" -const backPressureDataSourceAlgos: BackPressureDataSourceAlgo[] = [ +type BackPressureDataSource = "Embedded" | "Prometheus" +const backPressureDataSources: BackPressureDataSourceAlgo[] = [ "Embedded", "Prometheus", ] From 4e0138e86da524068683d428de0a57b6a9ba8cc5 Mon Sep 17 00:00:00 2001 From: Yufan Song <33971064+yufansong@users.noreply.github.com> Date: Mon, 5 Feb 2024 15:36:06 -0800 Subject: [PATCH 07/14] Update dashboard/pages/fragment_graph.tsx Co-authored-by: Bugen Zhao --- dashboard/pages/fragment_graph.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 488756acb00b7..61834db34298b 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -488,7 +488,7 @@ export default function Streaming() { - Back Pressure + Back Pressure Algorithm - - - Back Pressure Algorithm - + {backPressureAlgoOptions.length > 0 && ( + + Back Pressure Algorithm + + + )} Fragments {fragmentDependencyDag && ( From 37b4b93e48b69971ee107afac897f7d57cc8737f Mon Sep 17 00:00:00 2001 From: yufansong Date: Mon, 5 Feb 2024 19:05:33 -0800 Subject: [PATCH 09/14] fix select box default showing --- dashboard/pages/fragment_graph.tsx | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index d1fe0b7a96d2e..46c79e266d51a 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -179,9 +179,8 @@ function buildFragmentDependencyAsEdges( const SIDEBAR_WIDTH = 200 -type BackPressureAlgo = "disable" | "p50" | "p90" | "p95" | "p99" +type BackPressureAlgo = "p50" | "p90" | "p95" | "p99" const backPressureAlgos: BackPressureAlgo[] = [ - "disable", "p50", "p90", "p95", @@ -212,7 +211,7 @@ export default function Streaming() { const { response: actorBackPressures } = useFetch( getActorBackPressures, INTERVAL, - backPressureAlgo !== null + backPressureDataSourceAlgo === "Prometheus" && backPressureAlgo !== null ) const fragmentDependencyCallback = useCallback(() => { @@ -350,7 +349,7 @@ export default function Streaming() { if ( (actorBackPressures && backPressureAlgo && - backPressureAlgo !== "disable") || + backPressureAlgo) || backPressuresMetricsWithoutPromtheus ) { let map = new Map() @@ -496,6 +495,7 @@ export default function Streaming() { setBackPressureAlgo(event.target.value as BackPressureAlgo); }} > + {backPressureAlgoOptions.map((algo) => (