Skip to content

Commit

Permalink
refactor frontend code, adopt to existing streaming graph
Browse files Browse the repository at this point in the history
  • Loading branch information
yufansong committed Jan 31, 2024
1 parent 2f1a053 commit 9ae89d7
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 81 deletions.
111 changes: 111 additions & 0 deletions dashboard/components/BackPressureTableWithoutPrometheus.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2024 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import {
Table,
TableCaption,
TableContainer,
Tbody,
Td,
Th,
Thead,
Tr,
} from "@chakra-ui/react"
import { sortBy } from "lodash"
import Head from "next/head"
import { Fragment, useEffect, useState } from "react"
import useErrorToast from "../hook/useErrorToast"
import RateBar from "./RateBar"
import { BackPressureInfo, getBackPressureWithoutPrometheus, BackPressuresMetrics, calculateBPRate, INTERVAL } from "../pages/api/metric"

export default function BackPressureTableWithoutPrometheus({
selectedFragmentIds,
}: {
selectedFragmentIds: Set<string>
}) {
const [backPressuresMetrics, setBackPressuresMetrics] =
useState<BackPressuresMetrics>()
const [previousBP, setPreviousBP] = useState<BackPressureInfo[]>([])
const toast = useErrorToast()

useEffect(() => {
let localPreviousBP = previousBP

async function doFetch() {
while (true) {
try {
const currentBP = await getBackPressureWithoutPrometheus()
const metrics = calculateBPRate(currentBP, localPreviousBP)
setBackPressuresMetrics(metrics)
localPreviousBP = currentBP
setPreviousBP(currentBP)

metrics.outputBufferBlockingDuration = sortBy(
metrics.outputBufferBlockingDuration,
(m) => (m.metric.fragment_id, m.metric.downstream_fragment_id)
)
setBackPressuresMetrics(metrics)
await new Promise((resolve) => setTimeout(resolve, INTERVAL)) // refresh every 5 secs
} catch (e: any) {
toast(e, "warning")
break
}
}
}
doFetch()
return () => { }
}, [toast])

const isSelected = (fragmentId: string) => selectedFragmentIds.has(fragmentId)

const retVal = (
<TableContainer>
<Table variant="simple">
<TableCaption>Back Pressures (Last 30 minutes)</TableCaption>
<Thead>
<Tr>
<Th>Fragment IDs &rarr; Downstream</Th>
<Th>Block Rate</Th>
</Tr>
</Thead>
<Tbody>
{backPressuresMetrics &&
backPressuresMetrics.outputBufferBlockingDuration
.filter((m) => isSelected(m.metric.fragment_id))
.map((m) => (
<Tr
key={`${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`}
>
<Td>{`Fragment ${m.metric.fragment_id} -> ${m.metric.downstream_fragment_id}`}</Td>
<Td>
<RateBar samples={m.sample} />
</Td>
</Tr>
))}
</Tbody>
</Table>
</TableContainer>
)
return (
<Fragment>
<Head>
<title>Streaming Back Pressure</title>
</Head>
{retVal}
</Fragment>
)
}
1 change: 0 additions & 1 deletion dashboard/components/Layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ function Layout({ children }: { children: React.ReactNode }) {
<NavTitle>Streaming</NavTitle>
<NavButton href="/dependency_graph/">Dependency Graph</NavButton>
<NavButton href="/fragment_graph/">Fragment Graph</NavButton>
<NavButton href="/back_pressure_rates/">Back Pressure Rates</NavButton>
</Section>
<Section>
<NavTitle>Batch</NavTitle>
Expand Down
122 changes: 98 additions & 24 deletions dashboard/pages/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,128 @@
*
*/
import { Metrics, MetricsSample } from "../../components/metrics"
import { Field } from "../../proto/gen/plan_common"
import api from "./api"

export const INTERVAL = 20000
export interface BackPressuresMetrics {
outputBufferBlockingDuration: Metrics[]
}

// Get back pressure from meta node -> prometheus
export async function getActorBackPressures() {
const res: BackPressuresMetrics = await api.get(
"/metrics/actor/back_pressures"
"/metrics/actor/back_pressures",
)
return res
}

export interface BackPressureInfo {
id: number;
name: string;
owner: number;
columns: Field[];
actorId: number,
fragementId: number,
donwStreamFragmentId: number,
value: number,
actorId: number
fragmentId: number
downstreamFragmentId: number
value: number
}

export interface BackPressureRateInfo {
actorId: number
fragmentId: number
downstreamFragmentId: number
backPressureRate: number
}

function convertToMapAndAgg(back_pressures: BackPressureInfo[]): Map<string, number> {
const map = new Map<string, number>()
for (const item of back_pressures) {
const key = `${item.fragmentId}-${item.downstreamFragmentId}`
if (map.has(key)) {
map.set(key, map.get(key) + item.value)
} else {
map.set(key, item.value)
}
}
return map
}

function convertFromMapAndAgg(map: Map<string, number>): BackPressureRateInfo[] {
const result: BackPressureRateInfo[] = []
map.forEach((value, key) => {
const [fragmentId, downstreamFragmentId] = key
.split("-")
.map(Number)
const backPressureRateInfo: BackPressureRateInfo = {
actorId: 0,
fragmentId,
downstreamFragmentId,
backPressureRate: value,
}
result.push(backPressureRateInfo)
})
return result
}

function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPressuresMetrics {
const bp_metrics: BackPressuresMetrics = {
outputBufferBlockingDuration: [],
}
for (const item of bp_rates) {
bp_metrics.outputBufferBlockingDuration.push({
metric: {
actor_id: item.actorId.toString(),
fragment_id: item.fragmentId.toString(),
downstream_fragment_id: item.downstreamFragmentId.toString(),
},
sample: [{
timestamp: Date.now()
, value: item.backPressureRate
}],
})
}
return bp_metrics
}

export function calculateBPRate(
back_pressure_new: BackPressureInfo[],
back_pressure_old: BackPressureInfo[],
): BackPressuresMetrics {
let map_new = convertToMapAndAgg(back_pressure_new)
let map_old = convertToMapAndAgg(back_pressure_old)
let result = new Map<string, number>()
map_new.forEach((value, key) => {
if (map_old.has(key)) {
result.set(
key,
(value - map_old.get(key)) / (INTERVAL * 1000000000),
)
} else {
result.set(key, 0)
}
})

return convertToBackPressureMetrics(convertFromMapAndAgg(result))
}

export const BackPressureInfo = {
fromJSON: (object: any) => {
return {
id: 0,
name: "",
owner: 0,
columns: [],
actorId: isSet(object.actorId) ? Number(object.actorId) : 0,
fragementId: isSet(object.fragementId) ? Number(object.fragementId) : 0,
donwStreamFragmentId: isSet(object.donwStreamFragmentId) ? Number(object.donwStreamFragmentId) : 0,
fragmentId: isSet(object.fragmentId) ? Number(object.fragmentId) : 0,
downstreamFragmentId: isSet(object.downstreamFragmentId)
? Number(object.downstreamFragmentId)
: 0,
value: isSet(object.value) ? Number(object.value) : 0,
}
},
}

// Get back pressure from meta node -> compute node
export async function getComputeBackPressures() {
const response = await api.get("/metrics/back_pressures");

let back_pressure_infos: BackPressureInfo[] = response.backPressureInfos.map(BackPressureInfo.fromJSON)

back_pressure_infos = back_pressure_infos.sort((a, b) => a.actorId - b.actorId)
export async function getBackPressureWithoutPrometheus() {
const response = await api.get("/metrics/back_pressures")
let back_pressure_infos: BackPressureInfo[] = response.backPressureInfos.map(
BackPressureInfo.fromJSON,
)
back_pressure_infos = back_pressure_infos.sort(
(a, b) => a.actorId - b.actorId,
)
return back_pressure_infos
}

Expand All @@ -89,5 +163,5 @@ export function p99(samples: MetricsSample[]) {
}

function isSet(value: any): boolean {
return value !== null && value !== undefined;
}
return value !== null && value !== undefined
}
24 changes: 0 additions & 24 deletions dashboard/pages/back_pressure_rates.tsx

This file was deleted.

Loading

0 comments on commit 9ae89d7

Please sign in to comment.