From 62c0beaba68698b51b1e8097ee0ad5d1e8ded59c Mon Sep 17 00:00:00 2001 From: yufansong Date: Fri, 2 Feb 2024 01:46:51 -0800 Subject: [PATCH] apply suggestions --- dashboard/components/BackPressureTable.tsx | 106 ------------ .../BackPressureTableWithoutPrometheus.tsx | 111 ------------- dashboard/components/FragmentGraph.tsx | 1 - dashboard/mock-server.js | 4 - dashboard/mock/fetch.sh | 1 - dashboard/pages/api/metric.ts | 56 ++++--- dashboard/pages/fragment_graph.tsx | 151 ++++++++++++------ 7 files changed, 141 insertions(+), 289 deletions(-) delete mode 100644 dashboard/components/BackPressureTable.tsx delete mode 100644 dashboard/components/BackPressureTableWithoutPrometheus.tsx diff --git a/dashboard/components/BackPressureTable.tsx b/dashboard/components/BackPressureTable.tsx deleted file mode 100644 index ad790c0cb680a..0000000000000 --- a/dashboard/components/BackPressureTable.tsx +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { - Table, - TableCaption, - TableContainer, - Tbody, - Td, - Th, - Thead, - Tr, -} from "@chakra-ui/react" -import { sortBy } from "lodash" -import Head from "next/head" -import { Fragment, useEffect, useState } from "react" -import useErrorToast from "../hook/useErrorToast" -import { - BackPressuresMetrics, - getActorBackPressures, -} from "../pages/api/metric" -import RateBar from "./RateBar" - -export default function BackPressureTable({ - selectedFragmentIds, -}: { - selectedFragmentIds: Set -}) { - const [backPressuresMetrics, setBackPressuresMetrics] = - useState() - const toast = useErrorToast() - - useEffect(() => { - async function doFetch() { - while (true) { - try { - let metrics = await getActorBackPressures() - metrics.outputBufferBlockingDuration = sortBy( - metrics.outputBufferBlockingDuration, - (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) - ) - setBackPressuresMetrics(metrics) - await new Promise((resolve) => setTimeout(resolve, 5000)) // refresh every 5 secs - } catch (e: any) { - toast(e, "warning") - break - } - } - } - doFetch() - return () => {} - }, [toast]) - - const isSelected = (fragmentId: string) => selectedFragmentIds.has(fragmentId) - - const retVal = ( - - - Back Pressures (Last 30 minutes) - - - - - - - - {backPressuresMetrics && - backPressuresMetrics.outputBufferBlockingDuration - .filter((m) => isSelected(m.metric.fragment_id)) - .map((m) => ( - - - - - ))} - -
Fragment IDs → DownstreamBlock Rate
{`Fragment ${m.metric.fragment_id} -> ${m.metric.downstream_fragment_id}`} - -
-
- ) - return ( - - - Streaming Back Pressure - - {retVal} - - ) -} diff --git a/dashboard/components/BackPressureTableWithoutPrometheus.tsx b/dashboard/components/BackPressureTableWithoutPrometheus.tsx deleted file mode 100644 index 02b949a61d94a..0000000000000 --- a/dashboard/components/BackPressureTableWithoutPrometheus.tsx +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2024 RisingWave Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { - Table, - TableCaption, - TableContainer, - Tbody, - Td, - Th, - Thead, - Tr, -} from "@chakra-ui/react" -import { sortBy } from "lodash" -import Head from "next/head" -import { Fragment, useEffect, useState } from "react" -import useErrorToast from "../hook/useErrorToast" -import RateBar from "./RateBar" -import { BackPressureInfo, getBackPressureWithoutPrometheus, BackPressuresMetrics, calculateBPRate, INTERVAL } from "../pages/api/metric" - -export default function BackPressureTableWithoutPrometheus({ - selectedFragmentIds, -}: { - selectedFragmentIds: Set -}) { - const [backPressuresMetrics, setBackPressuresMetrics] = - useState() - const [previousBP, setPreviousBP] = useState([]) - const toast = useErrorToast() - - useEffect(() => { - let localPreviousBP = previousBP - - async function doFetch() { - while (true) { - try { - const currentBP = await getBackPressureWithoutPrometheus() - const metrics = calculateBPRate(currentBP, localPreviousBP) - setBackPressuresMetrics(metrics) - localPreviousBP = currentBP - setPreviousBP(currentBP) - - metrics.outputBufferBlockingDuration = sortBy( - metrics.outputBufferBlockingDuration, - (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) - ) - setBackPressuresMetrics(metrics) - await new Promise((resolve) => setTimeout(resolve, INTERVAL)) // refresh every 5 secs - } catch (e: any) { - toast(e, "warning") - break - } - } - } - doFetch() - return () => { } - }, [toast]) - - const isSelected = (fragmentId: string) => selectedFragmentIds.has(fragmentId) - - const retVal = ( - - - Back Pressures (Last 30 minutes) - - - - - - - - {backPressuresMetrics && - backPressuresMetrics.outputBufferBlockingDuration - .filter((m) => isSelected(m.metric.fragment_id)) - .map((m) => ( - - - - - ))} - -
Fragment IDs → DownstreamBlock Rate
{`Fragment ${m.metric.fragment_id} -> ${m.metric.downstream_fragment_id}`} - -
-
- ) - return ( - - - Streaming Back Pressure - - {retVal} - - ) -} diff --git a/dashboard/components/FragmentGraph.tsx b/dashboard/components/FragmentGraph.tsx index 72184d1b2a8bc..6ca2c14dda28e 100644 --- a/dashboard/components/FragmentGraph.tsx +++ b/dashboard/components/FragmentGraph.tsx @@ -479,7 +479,6 @@ export default function FragmentGraph({ - {/* */} ) } diff --git a/dashboard/mock-server.js b/dashboard/mock-server.js index 11365f33ddc07..31d67bd87b772 100644 --- a/dashboard/mock-server.js +++ b/dashboard/mock-server.js @@ -80,7 +80,3 @@ app.get("/metrics/actor/back_pressures", (req, res, next) => { app.get("/monitor/await_tree/1", (req, res, next) => { res.json(require("./mock/await_tree_1.json")) }) - -app.get("/metrics/back_pressures", (req, res, next) => { - res.json(require("./mock/back_pressures.json")) -}) diff --git a/dashboard/mock/fetch.sh b/dashboard/mock/fetch.sh index baf86c3dc4ab7..5cc278e01d0c5 100755 --- a/dashboard/mock/fetch.sh +++ b/dashboard/mock/fetch.sh @@ -18,5 +18,4 @@ curl http://localhost:5691/api/sinks > sinks.json curl http://localhost:5691/api/sources > sources.json curl http://localhost:5691/api/metrics/cluster > metrics_cluster.json curl http://localhost:5691/api/metrics/actor/back_pressures > actor_back_pressures.json -curl http://localhost:5691/api/metrics/back_pressures > back_pressures.json curl http://localhost:5691/api/monitor/await_tree/1 > await_tree_1.json diff --git a/dashboard/pages/api/metric.ts b/dashboard/pages/api/metric.ts index cc258fe2d920b..7fad567699cf8 100644 --- a/dashboard/pages/api/metric.ts +++ b/dashboard/pages/api/metric.ts @@ -17,15 +17,16 @@ import { Metrics, MetricsSample } from "../../components/metrics" import api from "./api" -export const INTERVAL = 20000 +export const INTERVAL = 5000 export interface BackPressuresMetrics { outputBufferBlockingDuration: Metrics[] } // Get back pressure from meta node -> prometheus export async function getActorBackPressures() { + console.log("send api") const res: BackPressuresMetrics = await api.get( - "/metrics/actor/back_pressures", + "/metrics/actor/back_pressures" ) return res } @@ -44,25 +45,38 @@ export interface BackPressureRateInfo { backPressureRate: number } -function convertToMapAndAgg(back_pressures: BackPressureInfo[]): Map { +function convertToMapAndAgg( + back_pressures: BackPressureInfo[] +): Map { + // fragementId-downstreamFragementId, total value + const map_value = new Map() + // fragementId-downstreamFragementId, total count + const map_number = new Map() + // fragementId-downstreamFragementId, average value const map = new Map() for (const item of back_pressures) { const key = `${item.fragmentId}-${item.downstreamFragmentId}` - if (map.has(key)) { - map.set(key, map.get(key) + item.value) + if (map_value.has(key)) { + map_value.set(key, map_value.get(key) + item.value) + map_number.set(key, map_number.get(key) + 1) } else { - map.set(key, item.value) + map_value.set(key, item.value) + map_number.set(key, 1) } } + + for (const [key, value] of map_value) { + map.set(key, value / map_number.get(key)) + } return map } -function convertFromMapAndAgg(map: Map): BackPressureRateInfo[] { +function convertFromMapAndAgg( + map: Map +): BackPressureRateInfo[] { const result: BackPressureRateInfo[] = [] map.forEach((value, key) => { - const [fragmentId, downstreamFragmentId] = key - .split("-") - .map(Number) + const [fragmentId, downstreamFragmentId] = key.split("-").map(Number) const backPressureRateInfo: BackPressureRateInfo = { actorId: 0, fragmentId, @@ -74,7 +88,9 @@ function convertFromMapAndAgg(map: Map): BackPressureRateInfo[] return result } -function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPressuresMetrics { +function convertToBackPressureMetrics( + bp_rates: BackPressureRateInfo[] +): BackPressuresMetrics { const bp_metrics: BackPressuresMetrics = { outputBufferBlockingDuration: [], } @@ -85,10 +101,12 @@ function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPre fragment_id: item.fragmentId.toString(), downstream_fragment_id: item.downstreamFragmentId.toString(), }, - sample: [{ - timestamp: Date.now() - , value: item.backPressureRate - }], + sample: [ + { + timestamp: Date.now(), + value: item.backPressureRate, + }, + ], }) } return bp_metrics @@ -96,7 +114,7 @@ function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPre export function calculateBPRate( back_pressure_new: BackPressureInfo[], - back_pressure_old: BackPressureInfo[], + back_pressure_old: BackPressureInfo[] ): BackPressuresMetrics { let map_new = convertToMapAndAgg(back_pressure_new) let map_old = convertToMapAndAgg(back_pressure_old) @@ -105,7 +123,7 @@ export function calculateBPRate( if (map_old.has(key)) { result.set( key, - (value - map_old.get(key)) / (INTERVAL * 1000000000), + (value - map_old.get(key)) / ((INTERVAL / 1000) * 1000000000) ) } else { result.set(key, 0) @@ -132,10 +150,10 @@ export const BackPressureInfo = { export async function getBackPressureWithoutPrometheus() { const response = await api.get("/metrics/back_pressures") let back_pressure_infos: BackPressureInfo[] = response.backPressureInfos.map( - BackPressureInfo.fromJSON, + BackPressureInfo.fromJSON ) back_pressure_infos = back_pressure_infos.sort( - (a, b) => a.actorId - b.actorId, + (a, b) => a.actorId - b.actorId ) return back_pressure_infos } diff --git a/dashboard/pages/fragment_graph.tsx b/dashboard/pages/fragment_graph.tsx index 54eefc0760823..59a9db549ce9e 100644 --- a/dashboard/pages/fragment_graph.tsx +++ b/dashboard/pages/fragment_graph.tsx @@ -29,7 +29,7 @@ import { } from "@chakra-ui/react" import * as d3 from "d3" import { dagStratify } from "d3-dag" -import _ from "lodash" +import _, { sortBy } from "lodash" import Head from "next/head" import { parseAsInteger, useQueryState } from "nuqs" import { Fragment, useCallback, useEffect, useMemo, useState } from "react" @@ -41,9 +41,19 @@ import { FragmentBox } from "../lib/layout" import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta" import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan" import useFetch from "./api/fetch" -import { BackPressureInfo, getBackPressureWithoutPrometheus, BackPressuresMetrics, getActorBackPressures, BackPressureRateInfo, p50, p90, p95, p99, calculateBPRate, INTERVAL } from "./api/metric" +import { + BackPressureInfo, + BackPressuresMetrics, + INTERVAL, + calculateBPRate, + getActorBackPressures, + getBackPressureWithoutPrometheus, + p50, + p90, + p95, + p99, +} from "./api/metric" import { getFragments, getStreamingJobs } from "./api/streaming" -import { sortBy } from "lodash" interface DispatcherNode { [actorId: number]: Dispatcher[] @@ -169,8 +179,20 @@ function buildFragmentDependencyAsEdges( const SIDEBAR_WIDTH = 200 -type BackPressureAlgo = "p50" | "p90" | "p95" | "p99" | "current" -const backPressureAlgos: BackPressureAlgo[] = ["p50", "p90", "p95", "p99", "current"] +type BackPressureAlgo = "disable" | "p50" | "p90" | "p95" | "p99" +const backPressureAlgos: BackPressureAlgo[] = [ + "disable", + "p50", + "p90", + "p95", + "p99", +] + +type BackPressureDataSourceAlgo = "Embedded" | "Prometheus" +const backPressureDataSourceAlgos: BackPressureDataSourceAlgo[] = [ + "Embedded", + "Prometheus", +] export default function Streaming() { const { response: relationList } = useFetch(getStreamingJobs) @@ -179,6 +201,13 @@ export default function Streaming() { const [relationId, setRelationId] = useQueryState("id", parseAsInteger) const [backPressureAlgo, setBackPressureAlgo] = useQueryState("backPressure") const [selectedFragmentId, setSelectedFragmentId] = useState() + // used to get the data source + const [backPressureDataSourceAlgo, setBackPressureDataSourceAlgo] = + useState("Embedded") + // used to set the back pressure choice list + const [backPressureAlgoOptions, setBackPressureAlgoOptions] = useState( + [] as BackPressureAlgo[] + ) const { response: actorBackPressures } = useFetch( getActorBackPressures, @@ -211,46 +240,48 @@ export default function Streaming() { } } } - return () => { } + return () => {} }, [relationId, relationList, setRelationId]) + useEffect(() => { + if (backPressureDataSourceAlgo === "Prometheus") { + setBackPressureAlgoOptions(backPressureAlgos) + } else { + setBackPressureAlgoOptions([]) + } + }, [backPressureDataSourceAlgo]) // get back pressure rate without prometheus const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] = useState() const [previousBP, setPreviousBP] = useState([]) - const [backPressureRate, setBackPressureRate] = useState< - BackPressureRateInfo[] - >([]) + const [currentBP, setCurrentBP] = useState([]) const toast = useErrorToast() useEffect(() => { - let localPreviousBP = previousBP - - async function doFetch() { - while (true) { - try { - const currentBP = await getBackPressureWithoutPrometheus() - const metrics = calculateBPRate(currentBP, localPreviousBP) - setBackPressureRate(backPressureRate) - localPreviousBP = currentBP - setPreviousBP(currentBP) - metrics.outputBufferBlockingDuration = sortBy( - metrics.outputBufferBlockingDuration, - (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) - ) - - setBackPressuresMetrics(metrics) - await new Promise((resolve) => setTimeout(resolve, INTERVAL)) - } catch (e: any) { - toast(e, "warning") - break - } + const interval = setInterval(() => { + const fetchNewBP = async () => { + const newBP = await getBackPressureWithoutPrometheus() + setPreviousBP(currentBP) + setCurrentBP(newBP) } + + fetchNewBP().catch(console.error) + }, INTERVAL) + + return () => clearInterval(interval) + }, [currentBP]) + + useEffect(() => { + if (currentBP !== null && previousBP !== null) { + const metrics = calculateBPRate(currentBP, previousBP) + metrics.outputBufferBlockingDuration = sortBy( + metrics.outputBufferBlockingDuration, + (m) => (m.metric.fragment_id, m.metric.downstream_fragment_id) + ) + setBackPressuresMetrics(metrics) } - doFetch() - return () => { } - }, [toast]) + }, [currentBP, previousBP]) const fragmentDependency = fragmentDependencyCallback()?.fragmentDep const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag @@ -316,19 +347,29 @@ export default function Streaming() { } const backPressures = useMemo(() => { - if ((actorBackPressures || backPressuresMetricsWithoutPromtheus) && backPressureAlgo) { + if ( + (actorBackPressures && + backPressureAlgo && + backPressureAlgo !== "disable") || + backPressuresMetricsWithoutPromtheus + ) { let map = new Map() - if (backPressureAlgo === "current" && backPressuresMetricsWithoutPromtheus) { + if ( + backPressureDataSourceAlgo === "Embedded" && + backPressuresMetricsWithoutPromtheus + ) { for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) { map.set( `${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`, m.sample[0].value ) } - } else if (backPressureAlgo !== "current" && actorBackPressures) { + } else if ( + backPressureDataSourceAlgo !== "Embedded" && + actorBackPressures + ) { for (const m of actorBackPressures.outputBufferBlockingDuration) { - console.log(backPressureAlgo) let algoFunc switch (backPressureAlgo) { case "p50": @@ -356,7 +397,12 @@ export default function Streaming() { } return map } - }, [actorBackPressures, backPressureAlgo, backPressuresMetricsWithoutPromtheus]) + }, [ + backPressureDataSourceAlgo, + actorBackPressures, + backPressureAlgo, + backPressuresMetricsWithoutPromtheus, + ]) const retVal = ( @@ -427,19 +473,30 @@ export default function Streaming() { - Back Pressure + Data Source + + + Back Pressure +