Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(dashboard): refactor back-pressure calculation #20001

Merged
merged 5 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 0 additions & 46 deletions dashboard/components/RateBar.tsx

This file was deleted.

174 changes: 0 additions & 174 deletions dashboard/lib/api/metric.ts

This file was deleted.

118 changes: 57 additions & 61 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ import FragmentDependencyGraph from "../components/FragmentDependencyGraph"
import FragmentGraph from "../components/FragmentGraph"
import Title from "../components/Title"
import useErrorToast from "../hook/useErrorToast"
import api from "../lib/api/api"
import useFetch from "../lib/api/fetch"
import {
calculateBPRate,
calculateCumulativeBp,
fetchEmbeddedBackPressure,
} from "../lib/api/metric"
import {
getFragmentsByJobId,
getRelationIdInfos,
Expand Down Expand Up @@ -190,13 +186,43 @@ function buildFragmentDependencyAsEdges(

const SIDEBAR_WIDTH = 225

// The state of the embedded back pressure metrics.
// The metrics from previous fetch are stored here to calculate the rate.
interface EmbeddedBackPressureInfo {
previous: BackPressureInfo[]
current: BackPressureInfo[]
totalBackpressureNs: BackPressureInfo[]
totalDurationNs: number
export class BackPressureSnapshot {
// The first fetch result.
// key: `<fragmentId>_<downstreamFragmentId>`
// value: output blocking duration in nanoseconds.
result: Map<string, number>

// The time of the current fetch in milliseconds. (`Date.now()`)
time: number

constructor(result: Map<string, number>, time: number) {
this.result = result
this.time = time
}

static fromResponse(channelStats: {
[key: string]: BackPressureInfo
}): BackPressureSnapshot {
const result = new Map<string, number>()
for (const [key, info] of Object.entries(channelStats)) {
result.set(key, info.value / info.actorCount)
}
return new BackPressureSnapshot(result, Date.now())
}

getRate(initial: BackPressureSnapshot): Map<string, number> {
const result = new Map<string, number>()
for (const [key, value] of this.result) {
const initialValue = initial.result.get(key)
if (initialValue) {
result.set(
key,
(value - initialValue) / (this.time - initial.time) / 1000000
)
}
}
return result
}
}

export default function Streaming() {
Expand Down Expand Up @@ -308,40 +334,30 @@ export default function Streaming() {
toast(new Error(`Actor ${searchActorIdInt} not found`))
}

// Periodically fetch embedded back-pressure from Meta node
// Didn't call `useFetch()` because the `setState` way is special.
const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] =
useState<EmbeddedBackPressureInfo>()
// Keep the initial snapshot to calculate the rate of back pressure
const [backPressureRate, setBackPressureRate] =
useState<Map<string, number>>()

const [fragmentStats, setFragmentStats] = useState<{
[key: number]: FragmentStats
}>()

useEffect(() => {
// The initial snapshot is used to calculate the rate of back pressure
// It's not used to render the page directly, so we don't need to set it in the state
let initialSnapshot: BackPressureSnapshot | undefined

function refresh() {
fetchEmbeddedBackPressure().then(
api.get("/metrics/fragment/embedded_back_pressures").then(
(response) => {
let newBP =
response.backPressureInfos?.map(BackPressureInfo.fromJSON) ?? []
setEmbeddedBackPressureInfo((prev) =>
prev
? {
previous: prev.current,
current: newBP,
totalBackpressureNs: calculateCumulativeBp(
prev.totalBackpressureNs,
prev.current,
newBP
),
totalDurationNs:
prev.totalDurationNs + INTERVAL_MS * 1000 * 1000,
}
: {
previous: newBP, // Use current value to show zero rate, but it's fine
current: newBP,
totalBackpressureNs: [],
totalDurationNs: 0,
}
let snapshot = BackPressureSnapshot.fromResponse(
response.channelStats
)
if (!initialSnapshot) {
initialSnapshot = snapshot
} else {
setBackPressureRate(snapshot.getRate(initialSnapshot!))
}
setFragmentStats(response.fragmentStats)
},
(e) => {
Expand All @@ -350,33 +366,13 @@ export default function Streaming() {
}
)
}
refresh()
const interval = setInterval(refresh, INTERVAL_MS)
refresh() // run once immediately
const interval = setInterval(refresh, INTERVAL_MS) // and then run every interval
return () => {
clearInterval(interval)
}
}, [toast])

const backPressures = useMemo(() => {
if (embeddedBackPressureInfo) {
let map = new Map()

if (embeddedBackPressureInfo) {
const metrics = calculateBPRate(
embeddedBackPressureInfo.totalBackpressureNs,
embeddedBackPressureInfo.totalDurationNs
)
for (const m of metrics.outputBufferBlockingDuration) {
map.set(
`${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`,
m.sample[0].value
)
}
}
return map
}
}, [embeddedBackPressureInfo])

const retVal = (
<Flex p={3} height="calc(100vh - 20px)" flexDirection="column">
<Title>Fragment Graph</Title>
Expand Down Expand Up @@ -503,7 +499,7 @@ export default function Streaming() {
selectedFragmentId={selectedFragmentId?.toString()}
fragmentDependency={fragmentDependency}
planNodeDependencies={planNodeDependencies}
backPressures={backPressures}
backPressures={backPressureRate}
fragmentStats={fragmentStats}
/>
)}
Expand Down
Loading
Loading