Skip to content

Commit

Permalink
fix(metrics): reduce overhead of merge_barrier_alignment_duration (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored Jun 12, 2024
1 parent c491b46 commit c9c7064
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use std::task::{Context, Poll};

use anyhow::Context as _;
use futures::stream::{FusedStream, FuturesUnordered, StreamFuture};
use prometheus::Histogram;
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::LabelGuardedMetric;
use tokio::time::Instant;

use super::exchange::input::BoxedInput;
Expand Down Expand Up @@ -92,12 +95,24 @@ impl MergeExecutor {

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self: Box<Self>) {
let merge_barrier_align_duration = if self.metrics.level >= MetricLevel::Debug {
Some(
self.metrics
.merge_barrier_align_duration
.with_label_values(&[
&self.actor_context.id.to_string(),
&self.actor_context.fragment_id.to_string(),
]),
)
} else {
None
};

// Futures of all active upstreams.
let select_all = SelectReceivers::new(
self.actor_context.id,
self.actor_context.fragment_id,
self.upstreams,
self.metrics.clone(),
merge_barrier_align_duration.clone(),
);
let actor_id = self.actor_context.id;

Expand Down Expand Up @@ -189,9 +204,8 @@ impl MergeExecutor {
// the one we polled from original upstreams.
let mut select_new = SelectReceivers::new(
self.actor_context.id,
self.fragment_id,
new_upstreams,
self.metrics.clone(),
merge_barrier_align_duration.clone(),
);
let new_barrier = expect_first_barrier(&mut select_new).await?;
assert_eq!(barrier, &new_barrier);
Expand Down Expand Up @@ -256,12 +270,10 @@ pub struct SelectReceivers {

/// The actor id of this fragment.
actor_id: u32,
/// The fragment id
fragment_id: u32,
/// watermark column index -> `BufferedWatermarks`
buffered_watermarks: BTreeMap<usize, BufferedWatermarks<ActorId>>,
/// Streaming Metrics
metrics: Arc<StreamingMetrics>,
/// If None, then we don't take `Instant::now()` and `observe` during `poll_next`
merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram, 2>>,
}

impl Stream for SelectReceivers {
Expand All @@ -274,10 +286,6 @@ impl Stream for SelectReceivers {
return Poll::Ready(None);
}

let merge_barrier_align_duration = self
.metrics
.merge_barrier_align_duration
.with_label_values(&[&self.actor_id.to_string(), &self.fragment_id.to_string()]);
let mut start = None;
loop {
match futures::ready!(self.active.poll_next_unpin(cx)) {
Expand All @@ -303,7 +311,9 @@ impl Stream for SelectReceivers {
}
Message::Barrier(barrier) => {
// Block this upstream by pushing it to `blocked`.
if self.blocked.is_empty() {
if self.blocked.is_empty()
&& self.merge_barrier_align_duration.is_some()
{
start = Some(Instant::now());
}
self.blocked.push(remaining);
Expand Down Expand Up @@ -332,7 +342,11 @@ impl Stream for SelectReceivers {
Some((None, _)) => unreachable!(),
// There's no active upstreams. Process the barrier and resume the blocked ones.
None => {
if let Some(start) = start {
if let Some(start) = start
&& let Some(merge_barrier_align_duration) =
&self.merge_barrier_align_duration
{
// Observe did a few atomic operation inside, we want to avoid the overhead.
merge_barrier_align_duration.observe(start.elapsed().as_secs_f64())
}
break;
Expand Down Expand Up @@ -360,21 +374,19 @@ impl Stream for SelectReceivers {
impl SelectReceivers {
fn new(
actor_id: u32,
fragment_id: u32,
upstreams: Vec<BoxedInput>,
metrics: Arc<StreamingMetrics>,
merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram, 2>>,
) -> Self {
assert!(!upstreams.is_empty());
let upstream_actor_ids = upstreams.iter().map(|input| input.actor_id()).collect();
let mut this = Self {
blocked: Vec::with_capacity(upstreams.len()),
active: Default::default(),
actor_id,
fragment_id,
barrier: None,
upstream_actor_ids,
buffered_watermarks: Default::default(),
metrics,
merge_barrier_align_duration,
};
this.extend_active(upstreams);
this
Expand Down

0 comments on commit c9c7064

Please sign in to comment.