From c9c706415b52083205a91b228ae53a4bc662e97d Mon Sep 17 00:00:00 2001 From: lmatz Date: Wed, 12 Jun 2024 18:09:17 +0800 Subject: [PATCH] fix(metrics): reduce overhead of `merge_barrier_alignment_duration` (#17222) --- src/stream/src/executor/merge.rs | 48 ++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 8b1daa9695c4d..19124fe8c22d4 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -18,6 +18,9 @@ use std::task::{Context, Poll}; use anyhow::Context as _; use futures::stream::{FusedStream, FuturesUnordered, StreamFuture}; +use prometheus::Histogram; +use risingwave_common::config::MetricLevel; +use risingwave_common::metrics::LabelGuardedMetric; use tokio::time::Instant; use super::exchange::input::BoxedInput; @@ -92,12 +95,24 @@ impl MergeExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self: Box) { + let merge_barrier_align_duration = if self.metrics.level >= MetricLevel::Debug { + Some( + self.metrics + .merge_barrier_align_duration + .with_label_values(&[ + &self.actor_context.id.to_string(), + &self.actor_context.fragment_id.to_string(), + ]), + ) + } else { + None + }; + // Futures of all active upstreams. let select_all = SelectReceivers::new( self.actor_context.id, - self.actor_context.fragment_id, self.upstreams, - self.metrics.clone(), + merge_barrier_align_duration.clone(), ); let actor_id = self.actor_context.id; @@ -189,9 +204,8 @@ impl MergeExecutor { // the one we polled from original upstreams. let mut select_new = SelectReceivers::new( self.actor_context.id, - self.fragment_id, new_upstreams, - self.metrics.clone(), + merge_barrier_align_duration.clone(), ); let new_barrier = expect_first_barrier(&mut select_new).await?; assert_eq!(barrier, &new_barrier); @@ -256,12 +270,10 @@ pub struct SelectReceivers { /// The actor id of this fragment. actor_id: u32, - /// The fragment id - fragment_id: u32, /// watermark column index -> `BufferedWatermarks` buffered_watermarks: BTreeMap>, - /// Streaming Metrics - metrics: Arc, + /// If None, then we don't take `Instant::now()` and `observe` during `poll_next` + merge_barrier_align_duration: Option>, } impl Stream for SelectReceivers { @@ -274,10 +286,6 @@ impl Stream for SelectReceivers { return Poll::Ready(None); } - let merge_barrier_align_duration = self - .metrics - .merge_barrier_align_duration - .with_label_values(&[&self.actor_id.to_string(), &self.fragment_id.to_string()]); let mut start = None; loop { match futures::ready!(self.active.poll_next_unpin(cx)) { @@ -303,7 +311,9 @@ impl Stream for SelectReceivers { } Message::Barrier(barrier) => { // Block this upstream by pushing it to `blocked`. - if self.blocked.is_empty() { + if self.blocked.is_empty() + && self.merge_barrier_align_duration.is_some() + { start = Some(Instant::now()); } self.blocked.push(remaining); @@ -332,7 +342,11 @@ impl Stream for SelectReceivers { Some((None, _)) => unreachable!(), // There's no active upstreams. Process the barrier and resume the blocked ones. None => { - if let Some(start) = start { + if let Some(start) = start + && let Some(merge_barrier_align_duration) = + &self.merge_barrier_align_duration + { + // Observe did a few atomic operation inside, we want to avoid the overhead. merge_barrier_align_duration.observe(start.elapsed().as_secs_f64()) } break; @@ -360,9 +374,8 @@ impl Stream for SelectReceivers { impl SelectReceivers { fn new( actor_id: u32, - fragment_id: u32, upstreams: Vec, - metrics: Arc, + merge_barrier_align_duration: Option>, ) -> Self { assert!(!upstreams.is_empty()); let upstream_actor_ids = upstreams.iter().map(|input| input.actor_id()).collect(); @@ -370,11 +383,10 @@ impl SelectReceivers { blocked: Vec::with_capacity(upstreams.len()), active: Default::default(), actor_id, - fragment_id, barrier: None, upstream_actor_ids, buffered_watermarks: Default::default(), - metrics, + merge_barrier_align_duration, }; this.extend_active(upstreams); this