Skip to content

Commit

Permalink
add spill
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Aug 23, 2024
1 parent 84b7b94 commit 715ef63
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 21 deletions.
6 changes: 4 additions & 2 deletions dashboard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ For example:

```bash
./risedev d
sqllogictest -p 4566 -d dev './e2e_test/nexmark/create_tables.slt.part'
sqllogictest -p 4566 -d dev './e2e_test/streaming/nexmark/create_views.slt.part'
./risedev slt e2e_test/nexmark/create_sources.slt.part
./risedev psql -c 'CREATE TABLE dimension (v1 int);'
./risedev psql -c 'CREATE MATERIALIZED VIEW mv AS SELECT auction.* FROM dimension join auction on auction.id-auction.id = dimension.v1;'
./risedev psql -c 'INSERT INTO dimension select 0 from generate_series(1, 50);'
```

Install dependencies and start the development server.
Expand Down
48 changes: 40 additions & 8 deletions dashboard/lib/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,55 @@ function convertToBackPressureMetrics(
return bpMetrics
}

export function calculateSpill(
current: BackPressureInfo[],
newBP: BackPressureInfo[],
mapSpilled: Map<string, number>,
intervalMs: number
): Map<string, number> {
let mapOld = convertToMapAndAgg(current)
let mapNew = convertToMapAndAgg(newBP)
let allKeys = new Set([
...mapNew.keys(),
...mapOld.keys(),
...mapSpilled.keys(),
])
let newSpilled = new Map<string, number>()
allKeys.forEach((key) => {
if (mapOld.has(key)) {
let newValue = mapNew.get(key) || 0
let oldValue = mapOld.get(key) || 0
let oldSpill = mapSpilled.get(key) || 0
let intervalNs = intervalMs * 1000000
let rawSpill = newValue - oldValue - intervalNs + oldSpill
let spill = rawSpill < 0 ? 0 : rawSpill
newSpilled.set(key, spill)
} else {
newSpilled.set(key, 0)
}
})
return newSpilled
}

export function calculateBPRate(
backPressureNew: BackPressureInfo[],
backPressureOld: BackPressureInfo[],
mapSpill: Map<string, number>,
intervalMs: number
): BackPressuresMetrics {
let mapNew = convertToMapAndAgg(backPressureNew)
let mapOld = convertToMapAndAgg(backPressureOld)
let result = new Map<string, number>()
mapNew.forEach((value, key) => {
mapNew.forEach((newValue, key) => {
if (mapOld.has(key)) {
result.set(
key,
// The *100 in end of the formular is to convert the BP rate to the value used in web UI drawing
((value - (mapOld.get(key) || 0)) /
((intervalMs / 1000) * 1000000000)) *
100
)
let oldValue = mapOld.get(key) || 0
let intervalNs = intervalMs * 1000000
let spill = mapSpill.get(key) || 0
let backpressureRate = (newValue - oldValue + spill) / intervalNs
let scale = 100 // Convert BP rate from decimal to %.
let scaledBackpressureRate = backpressureRate * scale

result.set(key, scaledBackpressureRate)
} else {
result.set(key, 0)
}
Expand Down
36 changes: 25 additions & 11 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import useFetch from "../lib/api/fetch"
import {
BackPressureInfo,
calculateBPRate,
calculateSpill,
fetchEmbeddedBackPressure,
fetchPrometheusBackPressure,
} from "../lib/api/metric"
Expand Down Expand Up @@ -187,6 +188,8 @@ const backPressureDataSources: BackPressureDataSource[] = [
interface EmbeddedBackPressureInfo {
previous: BackPressureInfo[]
current: BackPressureInfo[]
// Extra duration longer than the polling interval will be spilled.
spilled: Map<string, number>
}

export default function Streaming() {
Expand Down Expand Up @@ -307,17 +310,27 @@ export default function Streaming() {
fetchEmbeddedBackPressure().then(
(newBP) => {
console.log(newBP)
setEmbeddedBackPressureInfo((prev) =>
prev
? {
previous: prev.current,
current: newBP,
}
: {
previous: newBP, // Use current value to show zero rate, but it's fine
current: newBP,
}
)
setEmbeddedBackPressureInfo((prev) => {
if (prev) {
let spilled = calculateSpill(
prev.current,
newBP,
prev.spilled,
INTERVAL
)
return {
previous: prev.current,
current: newBP,
spilled: spilled,
}
} else {
return {
previous: newBP, // Use current value to show zero rate, but it's fine
current: newBP,
spilled: new Map<string, number>(),
}
}
})
},
(e) => {
console.error(e)
Expand All @@ -339,6 +352,7 @@ export default function Streaming() {
const metrics = calculateBPRate(
embeddedBackPressureInfo.current,
embeddedBackPressureInfo.previous,
embeddedBackPressureInfo.spilled,
INTERVAL
)
for (const m of metrics.outputBufferBlockingDuration) {
Expand Down

0 comments on commit 715ef63

Please sign in to comment.