Skip to content

Commit

Permalink
refactor(metrics): remove actor_id label from back-pressure metrics…
Browse files Browse the repository at this point in the history
… based on metrics level (#18213)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Aug 29, 2024
1 parent a5cbeb7 commit edb1493
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 76 deletions.
38 changes: 7 additions & 31 deletions dashboard/lib/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*
*/
import { Metrics, MetricsSample } from "../../components/metrics"
import { GetBackPressureResponse } from "../../proto/gen/monitor_service"
import {
BackPressureInfo,
GetBackPressureResponse,
} from "../../proto/gen/monitor_service"
import api from "./api"

export interface BackPressuresMetrics {
Expand All @@ -30,13 +33,6 @@ export async function fetchPrometheusBackPressure() {
return res
}

export interface BackPressureInfo {
actorId: number
fragmentId: number
downstreamFragmentId: number
value: number
}

export interface BackPressureRateInfo {
actorId: number
fragmentId: number
Expand All @@ -55,14 +51,8 @@ function convertToMapAndAgg(
const map = new Map<string, number>()
for (const item of backPressures) {
const key = `${item.fragmentId}-${item.downstreamFragmentId}`
if (mapValue.has(key) && mapNumber.has(key)) {
// add || tp avoid NaN and pass check
mapValue.set(key, (mapValue.get(key) || 0) + item.value)
mapNumber.set(key, (mapNumber.get(key) || 0) + 1)
} else {
mapValue.set(key, item.value)
mapNumber.set(key, 1)
}
mapValue.set(key, (mapValue.get(key) || 0) + item.value)
mapNumber.set(key, (mapNumber.get(key) || 0) + item.actorCount)
}

for (const [key, value] of mapValue) {
Expand Down Expand Up @@ -137,7 +127,7 @@ export function calculateCumulativeBp(
mapResult.forEach((value, key) => {
const [fragmentId, downstreamFragmentId] = key.split("-").map(Number)
const backPressureInfo: BackPressureInfo = {
actorId: 0,
actorCount: 1, // the value here has already been averaged by real actor count
fragmentId,
downstreamFragmentId,
value,
Expand All @@ -161,27 +151,13 @@ export function calculateBPRate(
return convertToBackPressureMetrics(convertFromMapAndAgg(result))
}

export const BackPressureInfo = {
fromJSON: (object: any) => {
return {
actorId: isSet(object.actorId) ? Number(object.actorId) : 0,
fragmentId: isSet(object.fragmentId) ? Number(object.fragmentId) : 0,
downstreamFragmentId: isSet(object.downstreamFragmentId)
? Number(object.downstreamFragmentId)
: 0,
value: isSet(object.value) ? Number(object.value) : 0,
}
},
}

// Get back pressure from meta node -> compute node
export async function fetchEmbeddedBackPressure() {
const response: GetBackPressureResponse = await api.get(
"/metrics/fragment/embedded_back_pressures"
)
let backPressureInfos: BackPressureInfo[] =
response.backPressureInfos?.map(BackPressureInfo.fromJSON) ?? []
backPressureInfos = backPressureInfos.sort((a, b) => a.actorId - b.actorId)
return backPressureInfos
}

Expand Down
2 changes: 1 addition & 1 deletion dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import Title from "../components/Title"
import useErrorToast from "../hook/useErrorToast"
import useFetch from "../lib/api/fetch"
import {
BackPressureInfo,
calculateBPRate,
calculateCumulativeBp,
fetchEmbeddedBackPressure,
Expand All @@ -52,6 +51,7 @@ import {
} from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta"
import { BackPressureInfo } from "../proto/gen/monitor_service"
import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan"

interface DispatcherNode {
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1133,8 +1133,12 @@ def section_streaming_actors(outer_panels):
"much time it takes an actor to process a message, i.e. a barrier, a watermark or rows of data, "
"on average. Then we divide this duration by 1 second and show it as a percentage.",
[
# Note: actor_count is equal to the number of dispatchers for a given downstream fragment,
# this holds true as long as we don't support multiple edges between two fragments.
panels.target(
f"avg(rate({metric('stream_actor_output_buffer_blocking_duration_ns')}[$__rate_interval])) by (fragment_id, downstream_fragment_id) / 1000000000",
f"sum(rate({metric('stream_actor_output_buffer_blocking_duration_ns')}[$__rate_interval])) by (fragment_id, downstream_fragment_id) \
/ ignoring (downstream_fragment_id) group_left sum({metric('stream_actor_count')}) by (fragment_id) \
/ 1000000000",
"fragment {{fragment_id}}->{{downstream_fragment_id}}",
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ message AnalyzeHeapResponse {
message GetBackPressureRequest {}

message BackPressureInfo {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 downstream_fragment_id = 3;
uint32 fragment_id = 1;
uint32 downstream_fragment_id = 2;
uint32 actor_count = 3;
double value = 4;
}

Expand Down
41 changes: 41 additions & 0 deletions src/common/metrics/src/gauge_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::IntGauge;

#[easy_ext::ext(IntGaugeExt)]
impl IntGauge {
/// Increment the gauge, and return a guard that will decrement the gauge when dropped.
#[must_use]
pub fn inc_guard(&self) -> impl Drop + '_ {
struct Guard<'a> {
gauge: &'a IntGauge,
}

impl<'a> Guard<'a> {
fn create(gauge: &'a IntGauge) -> Self {
gauge.inc();
Self { gauge }
}
}

impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
self.gauge.dec();
}
}

Guard::create(self)
}
}
2 changes: 2 additions & 0 deletions src/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;

mod error_metrics;
mod gauge_ext;
mod guarded_metrics;
mod metrics;
pub mod monitor;
mod relabeled_metric;

pub use error_metrics::*;
pub use gauge_ext::*;
pub use guarded_metrics::*;
pub use metrics::*;
pub use relabeled_metric::*;
Expand Down
13 changes: 12 additions & 1 deletion src/common/metrics/src/relabeled_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::core::{MetricVec, MetricVecBuilder};
use prometheus::core::{Collector, MetricVec, MetricVecBuilder};
use prometheus::{HistogramVec, IntCounterVec};

use crate::{
Expand Down Expand Up @@ -89,6 +89,7 @@ impl<T: MetricVecBuilder> RelabeledMetricVec<MetricVec<T>> {
}

impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricVec<T, N>> {
// TODO: shall we rename this to `with_guarded_label_values`?
pub fn with_label_values(&self, vals: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
if self.metric_level > self.relabel_threshold {
// relabel first n labels to empty string
Expand All @@ -102,6 +103,16 @@ impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricV
}
}

impl<T: Collector> Collector for RelabeledMetricVec<T> {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
self.metric.desc()
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
self.metric.collect()
}
}

pub type RelabeledCounterVec = RelabeledMetricVec<IntCounterVec>;
pub type RelabeledHistogramVec = RelabeledMetricVec<HistogramVec>;

Expand Down
74 changes: 59 additions & 15 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::ffi::CString;
use std::fs;
use std::path::Path;
Expand Down Expand Up @@ -291,31 +291,75 @@ impl MonitorService for MonitorServiceImpl {
&self,
_request: Request<GetBackPressureRequest>,
) -> Result<Response<GetBackPressureResponse>, Status> {
let metric_family = global_streaming_metrics(MetricLevel::Info)
let metrics = global_streaming_metrics(MetricLevel::Info);
let actor_output_buffer_blocking_duration_ns = metrics
.actor_output_buffer_blocking_duration_ns
.collect()
.into_iter()
.next()
.unwrap()
.take_metric();
let actor_count = metrics
.actor_count
.collect()
.into_iter()
.next()
.unwrap()
.take_metric();

let actor_count: HashMap<_, _> = actor_count
.iter()
.filter_map(|m| {
let fragment_id = m
.get_label()
.iter()
.find(|lp| lp.get_name() == "fragment_id")?
.get_value()
.parse::<u32>()
.unwrap();
let count = m.get_gauge().get_value() as u32;
Some((fragment_id, count))
})
.collect();
let metrics = metric_family.get(0).unwrap().get_metric();
let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new();
for label_pairs in metrics {
let mut back_pressure_info = BackPressureInfo::default();

let mut back_pressure_infos: HashMap<_, BackPressureInfo> = HashMap::new();

for label_pairs in actor_output_buffer_blocking_duration_ns {
let mut fragment_id = None;
let mut downstream_fragment_id = None;
for label_pair in label_pairs.get_label() {
if label_pair.get_name() == "actor_id" {
back_pressure_info.actor_id = label_pair.get_value().parse::<u32>().unwrap();
}
if label_pair.get_name() == "fragment_id" {
back_pressure_info.fragment_id = label_pair.get_value().parse::<u32>().unwrap();
fragment_id = label_pair.get_value().parse::<u32>().ok();
}
if label_pair.get_name() == "downstream_fragment_id" {
back_pressure_info.downstream_fragment_id =
label_pair.get_value().parse::<u32>().unwrap();
downstream_fragment_id = label_pair.get_value().parse::<u32>().ok();
}
}
back_pressure_info.value = label_pairs.get_counter().get_value();
back_pressure_infos.push(back_pressure_info);
let Some(fragment_id) = fragment_id else {
continue;
};
let Some(downstream_fragment_id) = downstream_fragment_id else {
continue;
};

// When metrics level is Debug, we may have multiple metrics with the same label pairs
// (fragment_id, downstream_fragment_id). We need to aggregate them locally.
//
// Metrics from different compute nodes should be aggregated by the caller.
let back_pressure_info = back_pressure_infos
.entry((fragment_id, downstream_fragment_id))
.or_insert_with(|| BackPressureInfo {
fragment_id,
downstream_fragment_id,
actor_count: actor_count.get(&fragment_id).copied().unwrap_or_default(),
value: 0.,
});

back_pressure_info.value += label_pairs.get_counter().get_value();
}

Ok(Response::new(GetBackPressureResponse {
back_pressure_infos,
back_pressure_infos: back_pressure_infos.into_values().collect(),
}))
}

Expand Down
9 changes: 7 additions & 2 deletions src/meta/src/dashboard/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ pub async fn list_prometheus_fragment_back_pressure(
Extension(srv): Extension<Service>,
) -> Result<Json<FragmentBackPressure>> {
if let Some(ref client) = srv.prometheus_client {
let back_pressure_query =
format!("avg(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) / 1000000000", srv.prometheus_selector);
let back_pressure_query = format!(
"sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) \
/ ignoring (downstream_fragment_id) group_left sum(stream_actor_count{{{}}}) by (fragment_id) \
/ 1000000000",
srv.prometheus_selector,
srv.prometheus_selector,
);
let result = client.query(back_pressure_query).get().await.map_err(err)?;
let back_pressure_data = result
.data()
Expand Down
10 changes: 8 additions & 2 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use hytra::TrAdder;
use risingwave_common::catalog::TableId;
use risingwave_common::config::StreamingConfig;
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::metrics::{IntGaugeExt, GLOBAL_ERROR_METRICS};
use risingwave_common::util::epoch::EpochPair;
use risingwave_expr::expr_context::{expr_context_scope, FRAGMENT_ID};
use risingwave_expr::ExprError;
Expand Down Expand Up @@ -198,7 +198,6 @@ where
.into()));

let id = self.actor_context.id;

let span_name = format!("Actor {id}");

let new_span = |epoch: Option<EpochPair>| {
Expand All @@ -213,6 +212,13 @@ where
};
let mut span = new_span(None);

let actor_count = self
.actor_context
.streaming_metrics
.actor_count
.with_guarded_label_values(&[&self.actor_context.fragment_id.to_string()]);
let _actor_count_guard = actor_count.inc_guard();

let mut last_epoch: Option<EpochPair> = None;
let mut stream = Box::pin(Box::new(self.consumer).execute());

Expand Down
Loading

0 comments on commit edb1493

Please sign in to comment.