Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dashboard): fix back-pressure dashboard & some refactorings #15389

Merged
merged 14 commits into from
Mar 5, 2024
15 changes: 8 additions & 7 deletions dashboard/lib/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
import { Metrics, MetricsSample } from "../../components/metrics"
import api from "./api"

export const INTERVAL = 5000
export interface BackPressuresMetrics {
outputBufferBlockingDuration: Metrics[]
}

// Get back pressure from meta node -> prometheus
export async function getActorBackPressures() {
// Get back pressure from Prometheus
export async function fetchPrometheusBackPressure() {
const res: BackPressuresMetrics = await api.get(
"/metrics/fragment/prometheus_back_pressures"
)
Expand Down Expand Up @@ -114,7 +113,8 @@ function convertToBackPressureMetrics(

export function calculateBPRate(
backPressureNew: BackPressureInfo[],
backPressureOld: BackPressureInfo[]
backPressureOld: BackPressureInfo[],
intervalMs: number
): BackPressuresMetrics {
let mapNew = convertToMapAndAgg(backPressureNew)
let mapOld = convertToMapAndAgg(backPressureOld)
Expand All @@ -124,8 +124,9 @@ export function calculateBPRate(
result.set(
key,
// The *100 in end of the formular is to convert the BP rate to the value used in web UI drawing
((value - (mapOld.get(key) || 0)) / ((INTERVAL / 1000) * 1000000000)) *
100
((value - (mapOld.get(key) || 0)) /
((intervalMs / 1000) * 1000000000)) *
100
)
} else {
result.set(key, 0)
Expand All @@ -149,7 +150,7 @@ export const BackPressureInfo = {
}

// Get back pressure from meta node -> compute node
export async function getBackPressureWithoutPrometheus() {
export async function fetchEmbeddedBackPressure() {
const response = await api.get("/metrics/fragment/embedded_back_pressures")
let backPressureInfos: BackPressureInfo[] = response.backPressureInfos.map(
BackPressureInfo.fromJSON
Expand Down
171 changes: 87 additions & 84 deletions dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ import useErrorToast from "../hook/useErrorToast"
import useFetch from "../lib/api/fetch"
import {
BackPressureInfo,
BackPressuresMetrics,
INTERVAL,
calculateBPRate,
getActorBackPressures,
getBackPressureWithoutPrometheus,
fetchEmbeddedBackPressure,
fetchPrometheusBackPressure,
} from "../lib/api/metric"
import { getFragments, getStreamingJobs } from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
Expand All @@ -55,6 +53,9 @@ interface DispatcherNode {
[actorId: number]: Dispatcher[]
}

// Refresh interval (ms) for back pressure stats
const INTERVAL = 5000

/** Associated data of each plan node in the fragment graph, including the dispatchers. */
export interface PlanNodeDatum {
name: string
Expand Down Expand Up @@ -181,21 +182,21 @@ const backPressureDataSources: BackPressureDataSource[] = [
"Prometheus",
]

// The state of the embedded back pressure metrics.
// The metrics from previous fetch are stored here to calculate the rate.
interface EmbeddedBackPressureInfo {
previous: BackPressureInfo[]
current: BackPressureInfo[]
}

export default function Streaming() {
const { response: relationList } = useFetch(getStreamingJobs)
const { response: fragmentList } = useFetch(getFragments)

const [relationId, setRelationId] = useQueryState("id", parseAsInteger)
const [selectedFragmentId, setSelectedFragmentId] = useState<number>()
// used to get the data source
const [backPressureDataSource, setBackPressureDataSource] =
useState<BackPressureDataSource>("Embedded")

const { response: actorBackPressures } = useFetch(
getActorBackPressures,
INTERVAL,
backPressureDataSource === "Prometheus"
)
const toast = useErrorToast()

const fragmentDependencyCallback = useCallback(() => {
if (fragmentList) {
Expand All @@ -211,7 +212,6 @@ export default function Streaming() {
}
}
}
return undefined
}, [fragmentList, relationId])

useEffect(() => {
Expand All @@ -222,44 +222,8 @@ export default function Streaming() {
}
}
}
return () => {}
}, [relationId, relationList, setRelationId])

// get back pressure rate without prometheus
// TODO(bugen): extract the following logic to a hook and unify the interface
// with Prometheus data source.
const [backPressuresMetricsWithoutPromtheus, setBackPressuresMetrics] =
useState<BackPressuresMetrics>()
const [previousBP, setPreviousBP] = useState<BackPressureInfo[]>([])
const [currentBP, setCurrentBP] = useState<BackPressureInfo[]>([])
const toast = useErrorToast()

useEffect(() => {
if (backPressureDataSource === "Embedded") {
const interval = setInterval(() => {
const fetchNewBP = async () => {
const newBP = await getBackPressureWithoutPrometheus()
setPreviousBP(currentBP)
setCurrentBP(newBP)
}

fetchNewBP().catch(console.error)
}, INTERVAL)
return () => clearInterval(interval)
}
}, [currentBP, backPressureDataSource])

useEffect(() => {
if (currentBP !== null && previousBP !== null) {
const metrics = calculateBPRate(currentBP, previousBP)
metrics.outputBufferBlockingDuration = sortBy(
metrics.outputBufferBlockingDuration,
(m) => (m.metric.fragmentId, m.metric.downstreamFragmentId)
)
setBackPressuresMetrics(metrics)
}
}, [currentBP, previousBP])

const fragmentDependency = fragmentDependencyCallback()?.fragmentDep
const fragmentDependencyDag = fragmentDependencyCallback()?.fragmentDepDag
const fragments = fragmentDependencyCallback()?.fragments
Expand Down Expand Up @@ -323,49 +287,86 @@ export default function Streaming() {
toast(new Error(`Actor ${searchActorIdInt} not found`))
}

const [backPressureDataSource, setBackPressureDataSource] =
useState<BackPressureDataSource>("Embedded")

// Periodically fetch Prometheus back-pressure from Meta node
const { response: promethusMetrics } = useFetch(
fetchPrometheusBackPressure,
INTERVAL,
backPressureDataSource === "Prometheus"
)

// Periodically fetch embedded back-pressure from Meta node
// Didn't call `useFetch()` because the `setState` way is special.
const [embeddedBackPressureInfo, setEmbeddedBackPressureInfo] =
useState<EmbeddedBackPressureInfo>()
useEffect(() => {
if (backPressureDataSource === "Embedded") {
const interval = setInterval(() => {
fetchEmbeddedBackPressure().then(
(newBP) => {
console.log(newBP)
Copy link
Member

@yufansong yufansong Mar 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove the log?

Copy link
Member Author

@fuyufjh fuyufjh Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intended. It would help with debugging in some cases. The useFetch() also prints the log for responses.

setEmbeddedBackPressureInfo((prev) =>
prev
? {
previous: prev.current,
current: newBP,
}
: {
previous: newBP, // Use current value to show zero rate, but it's fine
current: newBP,
}
)
},
(e) => {
console.error(e)
toast(e, "error")
}
)
}, INTERVAL)
return () => {
clearInterval(interval)
}
}
}, [backPressureDataSource])

const backPressures = useMemo(() => {
if (actorBackPressures || backPressuresMetricsWithoutPromtheus) {
if (promethusMetrics || embeddedBackPressureInfo) {
let map = new Map()

if (
backPressureDataSource === "Embedded" &&
backPressuresMetricsWithoutPromtheus
) {
for (const m of backPressuresMetricsWithoutPromtheus.outputBufferBlockingDuration) {
if (backPressureDataSource === "Embedded" && embeddedBackPressureInfo) {
const metrics = calculateBPRate(
embeddedBackPressureInfo.current,
embeddedBackPressureInfo.previous,
INTERVAL
)
for (const m of metrics.outputBufferBlockingDuration) {
map.set(
`${m.metric.fragmentId}_${m.metric.downstreamFragmentId}`,
m.sample[0].value
)
}
} else if (
backPressureDataSource === "Prometheus" &&
actorBackPressures
) {
if (actorBackPressures) {
for (const m of actorBackPressures.outputBufferBlockingDuration) {
if (m.sample.length > 0) {
// Note: We issue an instant query to Prometheus to get the most recent value.
// So there should be only one sample here.
//
// Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still
// possible that an old version of meta service returns a range-query result.
// So we take the one with the latest timestamp here.
const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100
map.set(
`${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`,
value
)
}
} else if (backPressureDataSource === "Prometheus" && promethusMetrics) {
for (const m of promethusMetrics.outputBufferBlockingDuration) {
if (m.sample.length > 0) {
// Note: We issue an instant query to Prometheus to get the most recent value.
// So there should be only one sample here.
//
// Due to https://github.com/risingwavelabs/risingwave/issues/15280, it's still
// possible that an old version of meta service returns a range-query result.
// So we take the one with the latest timestamp here.
const value = _(m.sample).maxBy((s) => s.timestamp)!.value * 100
map.set(
`${m.metric.fragment_id}_${m.metric.downstream_fragment_id}`,
value
)
}
}
}
return map
}
}, [
backPressureDataSource,
actorBackPressures,
backPressuresMetricsWithoutPromtheus,
])
}, [backPressureDataSource, promethusMetrics, embeddedBackPressureInfo])

const retVal = (
<Flex p={3} height="calc(100vh - 20px)" flexDirection="column">
Expand Down Expand Up @@ -444,12 +445,14 @@ export default function Streaming() {
event.target.value as BackPressureDataSource
)
}
defaultValue="Embedded"
>
{backPressureDataSources.map((algo) => (
<option value={algo} key={algo}>
{algo}
</option>
))}
<option value="Embedded" key="Embedded">
Embedded (5 secs)
</option>
<option value="Prometheus" key="Prometheus">
Prometheus (1 min)
</option>
</Select>
</FormControl>
<Flex height="full" width="full" flexDirection="column">
Expand Down
11 changes: 11 additions & 0 deletions src/meta/src/dashboard/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ impl From<&InstantVector> for PrometheusVector {
}
}

// Note(eric): For backward compatibility, we store the `InstantVector` as a single sample,
// instead of defining a new struct.
impl From<&InstantVector> for PrometheusVector {
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
fn from(value: &InstantVector) -> Self {
PrometheusVector {
metric: value.metric().clone(),
sample: vec![PrometheusSample::from(value.sample())],
}
}
}

#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ClusterMetrics {
Expand Down
Loading