Skip to content

Commit

Permalink
apply suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
yufansong committed Feb 2, 2024
1 parent 9ae89d7 commit 62c0bea
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 289 deletions.
106 changes: 0 additions & 106 deletions dashboard/components/BackPressureTable.tsx

This file was deleted.

111 changes: 0 additions & 111 deletions dashboard/components/BackPressureTableWithoutPrometheus.tsx

This file was deleted.

1 change: 0 additions & 1 deletion dashboard/components/FragmentGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,6 @@ export default function FragmentGraph({
<g className="fragment-edges" />
<g className="fragments" />
</svg>
{/* <BackPressureTable selectedFragmentIds={includedFragmentIds} /> */}
</Fragment>
)
}
Expand Down
4 changes: 0 additions & 4 deletions dashboard/mock-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,3 @@ app.get("/metrics/actor/back_pressures", (req, res, next) => {
app.get("/monitor/await_tree/1", (req, res, next) => {
res.json(require("./mock/await_tree_1.json"))
})

app.get("/metrics/back_pressures", (req, res, next) => {
res.json(require("./mock/back_pressures.json"))
})
1 change: 0 additions & 1 deletion dashboard/mock/fetch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,4 @@ curl http://localhost:5691/api/sinks > sinks.json
curl http://localhost:5691/api/sources > sources.json
curl http://localhost:5691/api/metrics/cluster > metrics_cluster.json
curl http://localhost:5691/api/metrics/actor/back_pressures > actor_back_pressures.json
curl http://localhost:5691/api/metrics/back_pressures > back_pressures.json
curl http://localhost:5691/api/monitor/await_tree/1 > await_tree_1.json
56 changes: 37 additions & 19 deletions dashboard/pages/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
import { Metrics, MetricsSample } from "../../components/metrics"
import api from "./api"

export const INTERVAL = 20000
export const INTERVAL = 5000
export interface BackPressuresMetrics {
outputBufferBlockingDuration: Metrics[]
}

// Get back pressure from meta node -> prometheus
export async function getActorBackPressures() {
console.log("send api")
const res: BackPressuresMetrics = await api.get(
"/metrics/actor/back_pressures",
"/metrics/actor/back_pressures"
)
return res
}
Expand All @@ -44,25 +45,38 @@ export interface BackPressureRateInfo {
backPressureRate: number
}

function convertToMapAndAgg(back_pressures: BackPressureInfo[]): Map<string, number> {
function convertToMapAndAgg(
back_pressures: BackPressureInfo[]
): Map<string, number> {
// fragementId-downstreamFragementId, total value

Check warning on line 51 in dashboard/pages/api/metric.ts

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"fragement" should be "fragment".

Check warning on line 51 in dashboard/pages/api/metric.ts

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Fragement" should be "Fragment".
const map_value = new Map<string, number>()
// fragementId-downstreamFragementId, total count

Check warning on line 53 in dashboard/pages/api/metric.ts

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"fragement" should be "fragment".

Check warning on line 53 in dashboard/pages/api/metric.ts

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Fragement" should be "Fragment".
const map_number = new Map<string, number>()
// fragementId-downstreamFragementId, average value

Check warning on line 55 in dashboard/pages/api/metric.ts

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"fragement" should be "fragment".

Check warning on line 55 in dashboard/pages/api/metric.ts

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Fragement" should be "Fragment".
const map = new Map<string, number>()
for (const item of back_pressures) {
const key = `${item.fragmentId}-${item.downstreamFragmentId}`
if (map.has(key)) {
map.set(key, map.get(key) + item.value)
if (map_value.has(key)) {
map_value.set(key, map_value.get(key) + item.value)
map_number.set(key, map_number.get(key) + 1)
} else {
map.set(key, item.value)
map_value.set(key, item.value)
map_number.set(key, 1)
}
}

for (const [key, value] of map_value) {
map.set(key, value / map_number.get(key))
}
return map
}

function convertFromMapAndAgg(map: Map<string, number>): BackPressureRateInfo[] {
function convertFromMapAndAgg(
map: Map<string, number>
): BackPressureRateInfo[] {
const result: BackPressureRateInfo[] = []
map.forEach((value, key) => {
const [fragmentId, downstreamFragmentId] = key
.split("-")
.map(Number)
const [fragmentId, downstreamFragmentId] = key.split("-").map(Number)
const backPressureRateInfo: BackPressureRateInfo = {
actorId: 0,
fragmentId,
Expand All @@ -74,7 +88,9 @@ function convertFromMapAndAgg(map: Map<string, number>): BackPressureRateInfo[]
return result
}

function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPressuresMetrics {
function convertToBackPressureMetrics(
bp_rates: BackPressureRateInfo[]
): BackPressuresMetrics {
const bp_metrics: BackPressuresMetrics = {
outputBufferBlockingDuration: [],
}
Expand All @@ -85,18 +101,20 @@ function convertToBackPressureMetrics(bp_rates: BackPressureRateInfo[]): BackPre
fragment_id: item.fragmentId.toString(),
downstream_fragment_id: item.downstreamFragmentId.toString(),
},
sample: [{
timestamp: Date.now()
, value: item.backPressureRate
}],
sample: [
{
timestamp: Date.now(),
value: item.backPressureRate,
},
],
})
}
return bp_metrics
}

export function calculateBPRate(
back_pressure_new: BackPressureInfo[],
back_pressure_old: BackPressureInfo[],
back_pressure_old: BackPressureInfo[]
): BackPressuresMetrics {
let map_new = convertToMapAndAgg(back_pressure_new)
let map_old = convertToMapAndAgg(back_pressure_old)
Expand All @@ -105,7 +123,7 @@ export function calculateBPRate(
if (map_old.has(key)) {
result.set(
key,
(value - map_old.get(key)) / (INTERVAL * 1000000000),
(value - map_old.get(key)) / ((INTERVAL / 1000) * 1000000000)
)
} else {
result.set(key, 0)
Expand All @@ -132,10 +150,10 @@ export const BackPressureInfo = {
export async function getBackPressureWithoutPrometheus() {
const response = await api.get("/metrics/back_pressures")
let back_pressure_infos: BackPressureInfo[] = response.backPressureInfos.map(
BackPressureInfo.fromJSON,
BackPressureInfo.fromJSON
)
back_pressure_infos = back_pressure_infos.sort(
(a, b) => a.actorId - b.actorId,
(a, b) => a.actorId - b.actorId
)
return back_pressure_infos
}
Expand Down
Loading

0 comments on commit 62c0bea

Please sign in to comment.