Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(metrics): remove actor_id label from back-pressure metrics based on metrics level #18213

Merged
merged 8 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 7 additions & 31 deletions dashboard/lib/api/metric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*
*/
import { Metrics, MetricsSample } from "../../components/metrics"
import { GetBackPressureResponse } from "../../proto/gen/monitor_service"
import {
BackPressureInfo,
GetBackPressureResponse,
} from "../../proto/gen/monitor_service"
import api from "./api"

export interface BackPressuresMetrics {
Expand All @@ -30,13 +33,6 @@ export async function fetchPrometheusBackPressure() {
return res
}

export interface BackPressureInfo {
actorId: number
fragmentId: number
downstreamFragmentId: number
value: number
}

export interface BackPressureRateInfo {
actorId: number
fragmentId: number
Expand All @@ -55,14 +51,8 @@ function convertToMapAndAgg(
const map = new Map<string, number>()
for (const item of backPressures) {
const key = `${item.fragmentId}-${item.downstreamFragmentId}`
if (mapValue.has(key) && mapNumber.has(key)) {
// add || tp avoid NaN and pass check
mapValue.set(key, (mapValue.get(key) || 0) + item.value)
mapNumber.set(key, (mapNumber.get(key) || 0) + 1)
} else {
mapValue.set(key, item.value)
mapNumber.set(key, 1)
}
mapValue.set(key, (mapValue.get(key) || 0) + item.value)
mapNumber.set(key, (mapNumber.get(key) || 0) + item.dispatcherCount)
}

for (const [key, value] of mapValue) {
Expand Down Expand Up @@ -137,7 +127,7 @@ export function calculateCumulativeBp(
mapResult.forEach((value, key) => {
const [fragmentId, downstreamFragmentId] = key.split("-").map(Number)
const backPressureInfo: BackPressureInfo = {
actorId: 0,
dispatcherCount: 1, // the value here has already been averaged by real dispatcher count
fragmentId,
downstreamFragmentId,
value,
Expand All @@ -161,27 +151,13 @@ export function calculateBPRate(
return convertToBackPressureMetrics(convertFromMapAndAgg(result))
}

export const BackPressureInfo = {
fromJSON: (object: any) => {
return {
actorId: isSet(object.actorId) ? Number(object.actorId) : 0,
fragmentId: isSet(object.fragmentId) ? Number(object.fragmentId) : 0,
downstreamFragmentId: isSet(object.downstreamFragmentId)
? Number(object.downstreamFragmentId)
: 0,
value: isSet(object.value) ? Number(object.value) : 0,
}
},
}

// Get back pressure from meta node -> compute node
export async function fetchEmbeddedBackPressure() {
const response: GetBackPressureResponse = await api.get(
"/metrics/fragment/embedded_back_pressures"
)
let backPressureInfos: BackPressureInfo[] =
response.backPressureInfos?.map(BackPressureInfo.fromJSON) ?? []
backPressureInfos = backPressureInfos.sort((a, b) => a.actorId - b.actorId)
return backPressureInfos
}

Expand Down
2 changes: 1 addition & 1 deletion dashboard/pages/fragment_graph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import Title from "../components/Title"
import useErrorToast from "../hook/useErrorToast"
import useFetch from "../lib/api/fetch"
import {
BackPressureInfo,
calculateBPRate,
calculateCumulativeBp,
fetchEmbeddedBackPressure,
Expand All @@ -52,6 +51,7 @@ import {
} from "../lib/api/streaming"
import { FragmentBox } from "../lib/layout"
import { TableFragments, TableFragments_Fragment } from "../proto/gen/meta"
import { BackPressureInfo } from "../proto/gen/monitor_service"
import { Dispatcher, MergeNode, StreamNode } from "../proto/gen/stream_plan"

interface DispatcherNode {
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,9 @@ def section_streaming_actors(outer_panels):
"on average. Then we divide this duration by 1 second and show it as a percentage.",
[
panels.target(
f"avg(rate({metric('stream_actor_output_buffer_blocking_duration_ns')}[$__rate_interval])) by (fragment_id, downstream_fragment_id) / 1000000000",
f"sum(rate({metric('stream_actor_output_buffer_blocking_duration_ns')}[$__rate_interval])) by (fragment_id, downstream_fragment_id) \
/ sum({metric('stream_dispatcher_count')}) by (fragment_id, downstream_fragment_id) \
/ 1000000000",
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
"fragment {{fragment_id}}->{{downstream_fragment_id}}",
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
message GetBackPressureRequest {}

message BackPressureInfo {
uint32 actor_id = 1;
uint32 fragment_id = 2;
uint32 downstream_fragment_id = 3;
uint32 fragment_id = 1;

Check failure on line 58 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "fragment_id" on message "BackPressureInfo" changed option "json_name" from "actorId" to "fragmentId".

Check failure on line 58 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" on message "BackPressureInfo" changed name from "actor_id" to "fragment_id".
uint32 downstream_fragment_id = 2;

Check failure on line 59 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" with name "downstream_fragment_id" on message "BackPressureInfo" changed option "json_name" from "fragmentId" to "downstreamFragmentId".

Check failure on line 59 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" on message "BackPressureInfo" changed name from "fragment_id" to "downstream_fragment_id".
uint32 dispatcher_count = 3;

Check failure on line 60 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" with name "dispatcher_count" on message "BackPressureInfo" changed option "json_name" from "downstreamFragmentId" to "dispatcherCount".

Check failure on line 60 in proto/monitor_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "3" on message "BackPressureInfo" changed name from "downstream_fragment_id" to "dispatcher_count".
double value = 4;
}

Expand Down
13 changes: 12 additions & 1 deletion src/common/metrics/src/relabeled_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::core::{MetricVec, MetricVecBuilder};
use prometheus::core::{Collector, MetricVec, MetricVecBuilder};
use prometheus::{HistogramVec, IntCounterVec};

use crate::{
Expand Down Expand Up @@ -89,6 +89,7 @@ impl<T: MetricVecBuilder> RelabeledMetricVec<MetricVec<T>> {
}

impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricVec<T, N>> {
// TODO: shall we rename this to `with_guarded_label_values`?
pub fn with_label_values(&self, vals: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
if self.metric_level > self.relabel_threshold {
// relabel first n labels to empty string
Expand All @@ -102,6 +103,16 @@ impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricV
}
}

impl<T: Collector> Collector for RelabeledMetricVec<T> {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
self.metric.desc()
}

fn collect(&self) -> Vec<prometheus::proto::MetricFamily> {
self.metric.collect()
}
}

pub type RelabeledCounterVec = RelabeledMetricVec<IntCounterVec>;
pub type RelabeledHistogramVec = RelabeledMetricVec<HistogramVec>;

Expand Down
53 changes: 46 additions & 7 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::ffi::CString;
use std::fs;
use std::path::Path;
Expand Down Expand Up @@ -291,17 +291,49 @@ impl MonitorService for MonitorServiceImpl {
&self,
_request: Request<GetBackPressureRequest>,
) -> Result<Response<GetBackPressureResponse>, Status> {
let metric_family = global_streaming_metrics(MetricLevel::Info)
let metrics = global_streaming_metrics(MetricLevel::Info);
let actor_output_buffer_blocking_duration_ns = metrics
.actor_output_buffer_blocking_duration_ns
.collect()
.into_iter()
.next()
.unwrap()
.take_metric();
let dispatcher_count = metrics
.dispatcher_count
.collect()
.into_iter()
.next()
.unwrap()
.take_metric();

let dispatcher_count: HashMap<_, _> = dispatcher_count
.iter()
.filter_map(|m| {
let fragment_id = m
.get_label()
.iter()
.find(|lp| lp.get_name() == "fragment_id")?
.get_value()
.parse::<u32>()
.unwrap();
let downstream_fragment_id = m
.get_label()
.iter()
.find(|lp| lp.get_name() == "downstream_fragment_id")?
.get_value()
.parse::<u32>()
.unwrap();
let count = m.get_gauge().get_value() as u32;
Some(((fragment_id, downstream_fragment_id), count))
})
.collect();
let metrics = metric_family.get(0).unwrap().get_metric();

let mut back_pressure_infos: Vec<BackPressureInfo> = Vec::new();
for label_pairs in metrics {

for label_pairs in actor_output_buffer_blocking_duration_ns {
let mut back_pressure_info = BackPressureInfo::default();
for label_pair in label_pairs.get_label() {
if label_pair.get_name() == "actor_id" {
back_pressure_info.actor_id = label_pair.get_value().parse::<u32>().unwrap();
}
if label_pair.get_name() == "fragment_id" {
back_pressure_info.fragment_id = label_pair.get_value().parse::<u32>().unwrap();
}
Expand All @@ -311,6 +343,13 @@ impl MonitorService for MonitorServiceImpl {
}
}
back_pressure_info.value = label_pairs.get_counter().get_value();
back_pressure_info.dispatcher_count = dispatcher_count
.get(&(
back_pressure_info.fragment_id,
back_pressure_info.downstream_fragment_id,
))
.copied()
.unwrap_or_default();
back_pressure_infos.push(back_pressure_info);
}

Expand Down
9 changes: 7 additions & 2 deletions src/meta/src/dashboard/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ pub async fn list_prometheus_fragment_back_pressure(
Extension(srv): Extension<Service>,
) -> Result<Json<FragmentBackPressure>> {
if let Some(ref client) = srv.prometheus_client {
let back_pressure_query =
format!("avg(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) / 1000000000", srv.prometheus_selector);
let back_pressure_query = format!(
"sum(rate(stream_actor_output_buffer_blocking_duration_ns{{{}}}[60s])) by (fragment_id, downstream_fragment_id) \
/ sum(stream_dispatcher_count{{{}}}) by (fragment_id, downstream_fragment_id) \
/ 1000000000",
srv.prometheus_selector,
srv.prometheus_selector,
);
let result = client.query(back_pressure_query).get().await.map_err(err)?;
let back_pressure_data = result
.data()
Expand Down
46 changes: 31 additions & 15 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::iter::repeat_with;
use std::ops::{Deref, DerefMut};
use std::time::Duration;

use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::array::Op;
use risingwave_common::bitmap::BitmapBuilder;
use risingwave_common::hash::{ActorMapping, ExpandedActorMapping, VirtualNode};
use risingwave_common::metrics::LabelGuardedIntCounter;
use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
use risingwave_pb::stream_plan::PbDispatcher;
Expand All @@ -49,6 +50,14 @@ pub struct DispatchExecutor {
struct DispatcherWithMetrics {
dispatcher: DispatcherImpl,
actor_output_buffer_blocking_duration_ns: LabelGuardedIntCounter<3>,
dispatcher_count: LabelGuardedIntGauge<2>,
}

impl DispatcherWithMetrics {
pub fn record_output_buffer_blocking_duration(&self, duration: Duration) {
let ns = duration.as_nanos() as u64;
self.actor_output_buffer_blocking_duration_ns.inc_by(ns);
}
}

impl Debug for DispatcherWithMetrics {
Expand All @@ -71,24 +80,37 @@ impl DerefMut for DispatcherWithMetrics {
}
}

struct DispatcherMetrics {
impl Drop for DispatcherWithMetrics {
fn drop(&mut self) {
self.dispatcher_count.dec();
}
}

struct DispatchExecutorMetrics {
actor_id_str: String,
fragment_id_str: String,
metrics: Arc<StreamingMetrics>,
actor_out_record_cnt: LabelGuardedIntCounter<2>,
}

impl DispatcherMetrics {
impl DispatchExecutorMetrics {
fn monitor_dispatcher(&self, dispatcher: DispatcherImpl) -> DispatcherWithMetrics {
let dispatcher_count = self
.metrics
.dispatcher_count
.with_guarded_label_values(&[&self.fragment_id_str, dispatcher.dispatcher_id_str()]);
dispatcher_count.inc();

DispatcherWithMetrics {
actor_output_buffer_blocking_duration_ns: self
.metrics
.actor_output_buffer_blocking_duration_ns
.with_guarded_label_values(&[
.with_label_values(&[
&self.actor_id_str,
&self.fragment_id_str,
dispatcher.dispatcher_id_str(),
]),
dispatcher_count,
dispatcher,
}
}
Expand All @@ -98,7 +120,7 @@ struct DispatchExecutorInner {
dispatchers: Vec<DispatcherWithMetrics>,
actor_id: u32,
context: Arc<SharedContext>,
metrics: DispatcherMetrics,
metrics: DispatchExecutorMetrics,
}

impl DispatchExecutorInner {
Expand All @@ -112,9 +134,7 @@ impl DispatchExecutorInner {
.try_for_each_concurrent(limit, |dispatcher| async {
let start_time = Instant::now();
dispatcher.dispatch_watermark(watermark.clone()).await?;
dispatcher
.actor_output_buffer_blocking_duration_ns
.inc_by(start_time.elapsed().as_nanos() as u64);
dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
StreamResult::Ok(())
})
.await?;
Expand All @@ -125,9 +145,7 @@ impl DispatchExecutorInner {
.try_for_each_concurrent(limit, |dispatcher| async {
let start_time = Instant::now();
dispatcher.dispatch_data(chunk.clone()).await?;
dispatcher
.actor_output_buffer_blocking_duration_ns
.inc_by(start_time.elapsed().as_nanos() as u64);
dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
StreamResult::Ok(())
})
.await?;
Expand All @@ -147,9 +165,7 @@ impl DispatchExecutorInner {
dispatcher
.dispatch_barrier(barrier.clone().into_dispatcher())
.await?;
dispatcher
.actor_output_buffer_blocking_duration_ns
.inc_by(start_time.elapsed().as_nanos() as u64);
dispatcher.record_output_buffer_blocking_duration(start_time.elapsed());
StreamResult::Ok(())
})
.await?;
Expand Down Expand Up @@ -361,7 +377,7 @@ impl DispatchExecutor {
let actor_out_record_cnt = metrics
.actor_out_record_cnt
.with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
let metrics = DispatcherMetrics {
let metrics = DispatchExecutorMetrics {
actor_id_str,
fragment_id_str,
metrics,
Expand Down
Loading
Loading