Skip to content

Commit

Permalink
fix: barrier align metrics for union and dynamic filter (#16826)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored May 21, 2024
1 parent 96e2002 commit 93646a5
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 30 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1204,19 +1204,19 @@ def section_streaming_actors(outer_panels):
],
),
panels.timeseries_actor_latency(
"Join Executor Barrier Align",
"Executor Barrier Align",
"",
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('stream_join_barrier_align_duration_bucket')}[$__rate_interval])) by (le, fragment_id, wait_side, {COMPONENT_LABEL}))",
f"p{legend} - fragment {{{{fragment_id}}}} {{{{wait_side}}}} - {{{{{COMPONENT_LABEL}}}}}",
f"histogram_quantile({quantile}, sum(rate({metric('stream_barrier_align_duration_bucket')}[$__rate_interval])) by (le, executor, fragment_id, wait_side, {COMPONENT_LABEL}))",
f"p{legend} - executor {{{{executor}}}} fragment {{{{fragment_id}}}} {{{{wait_side}}}} - {{{{{COMPONENT_LABEL}}}}}",
),
[90, 99, 999, "max"],
),
panels.target(
f"sum by(le, fragment_id, wait_side, job)(rate({metric('stream_join_barrier_align_duration_sum')}[$__rate_interval])) / sum by(le,fragment_id,wait_side,{COMPONENT_LABEL}) (rate({metric('stream_join_barrier_align_duration_count')}[$__rate_interval])) > 0",
"avg - fragment {{fragment_id}} {{wait_side}} - {{%s}}"
f"sum by(le, executor, fragment_id, wait_side, job)(rate({metric('stream_barrier_align_duration_sum')}[$__rate_interval])) / sum by(le,executor,fragment_id,wait_side,{COMPONENT_LABEL}) (rate({metric('stream_barrier_align_duration_count')}[$__rate_interval])) > 0",
"avg - executor {{executor}} fragment {{fragment_id}} {{wait_side}} - {{%s}}"
% COMPONENT_LABEL,
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

33 changes: 23 additions & 10 deletions src/stream/src/executor/barrier_align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,22 @@ pub async fn barrier_align(
actor_id: ActorId,
fragment_id: FragmentId,
metrics: Arc<StreamingMetrics>,
executor_name: &str,
) {
let actor_id = actor_id.to_string();
let fragment_id = fragment_id.to_string();
let left_join_barrier_align_duration = metrics
.join_barrier_align_duration
.with_label_values(&[&actor_id, &fragment_id, "left"]);
let right_join_barrier_align_duration = metrics
.join_barrier_align_duration
.with_label_values(&[&actor_id, &fragment_id, "right"]);
let left_barrier_align_duration = metrics.barrier_align_duration.with_label_values(&[
&actor_id,
&fragment_id,
"left",
executor_name,
]);
let right_barrier_align_duration = metrics.barrier_align_duration.with_label_values(&[
&actor_id,
&fragment_id,
"right",
executor_name,
]);
loop {
let prefer_left: bool = rand::random();
let select_result = if prefer_left {
Expand Down Expand Up @@ -113,7 +120,7 @@ pub async fn barrier_align(
Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
Message::Barrier(barrier) => {
yield AlignedMessage::Barrier(barrier);
right_join_barrier_align_duration
right_barrier_align_duration
.observe(start_time.elapsed().as_secs_f64());
break;
}
Expand All @@ -137,8 +144,7 @@ pub async fn barrier_align(
Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
Message::Barrier(barrier) => {
yield AlignedMessage::Barrier(barrier);
left_join_barrier_align_duration
.observe(start_time.elapsed().as_secs_f64());
left_barrier_align_duration.observe(start_time.elapsed().as_secs_f64());
break;
}
}
Expand All @@ -164,7 +170,14 @@ mod tests {
left: BoxedMessageStream,
right: BoxedMessageStream,
) -> impl Stream<Item = Result<AlignedMessage, StreamExecutorError>> {
barrier_align(left, right, 0, 0, Arc::new(StreamingMetrics::unused()))
barrier_align(
left,
right,
0,
0,
Arc::new(StreamingMetrics::unused()),
"dummy_executor",
)
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl<S: StateStore, const USE_WATERMARK_CACHE: bool> DynamicFilterExecutor<S, US
self.ctx.id,
self.ctx.fragment_id,
self.metrics.clone(),
"Dynamic Filter",
);

pin_mut!(aligned_stream);
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
self.ctx.id,
self.ctx.fragment_id,
self.metrics.clone(),
"Join",
);
pin_mut!(aligned_stream);

Expand Down
16 changes: 9 additions & 7 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ pub struct StreamingMetrics {
pub join_insert_cache_miss_count: LabelGuardedIntCounterVec<5>,
pub join_actor_input_waiting_duration_ns: LabelGuardedIntCounterVec<2>,
pub join_match_duration_ns: LabelGuardedIntCounterVec<3>,
pub join_barrier_align_duration: RelabeledGuardedHistogramVec<3>,
pub join_cached_entry_count: LabelGuardedIntGaugeVec<3>,
pub join_matched_join_keys: RelabeledGuardedHistogramVec<3>,

// Streaming Join and Streaming Dynamic Filter
pub barrier_align_duration: RelabeledGuardedHistogramVec<4>,

// Streaming Aggregation
pub agg_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub agg_total_lookup_count: GenericCounterVec<AtomicU64>,
Expand Down Expand Up @@ -457,20 +459,20 @@ impl StreamingMetrics {
.unwrap();

let opts = histogram_opts!(
"stream_join_barrier_align_duration",
"stream_barrier_align_duration",
"Duration of join align barrier",
exponential_buckets(0.0001, 2.0, 21).unwrap() // max 104s
);
let join_barrier_align_duration = register_guarded_histogram_vec_with_registry!(
let barrier_align_duration = register_guarded_histogram_vec_with_registry!(
opts,
&["actor_id", "fragment_id", "wait_side"],
&["actor_id", "fragment_id", "wait_side", "executor"],
registry
)
.unwrap();

let join_barrier_align_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
let barrier_align_duration = RelabeledGuardedHistogramVec::with_metric_level_relabel_n(
MetricLevel::Debug,
join_barrier_align_duration,
barrier_align_duration,
level,
1,
);
Expand Down Expand Up @@ -1128,9 +1130,9 @@ impl StreamingMetrics {
join_insert_cache_miss_count,
join_actor_input_waiting_duration_ns,
join_match_duration_ns,
join_barrier_align_duration,
join_cached_entry_count,
join_matched_join_keys,
barrier_align_duration,
agg_lookup_miss_count,
agg_total_lookup_count,
agg_cached_entry_count,
Expand Down
44 changes: 39 additions & 5 deletions src/stream/src/executor/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
use std::collections::BTreeMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;

use futures::stream::{FusedStream, FuturesUnordered};
use pin_project::pin_project;

use super::watermark::BufferedWatermarks;
use crate::executor::prelude::*;
use crate::task::FragmentId;

/// `UnionExecutor` merges data from multiple inputs.
pub struct UnionExecutor {
inputs: Vec<Executor>,
metrics: Arc<StreamingMetrics>,
actor_context: ActorContextRef,
}

impl std::fmt::Debug for UnionExecutor {
Expand All @@ -34,15 +38,29 @@ impl std::fmt::Debug for UnionExecutor {
}

impl UnionExecutor {
pub fn new(inputs: Vec<Executor>) -> Self {
Self { inputs }
pub fn new(
inputs: Vec<Executor>,
metrics: Arc<StreamingMetrics>,
actor_context: ActorContextRef,
) -> Self {
Self {
inputs,
metrics,
actor_context,
}
}
}

impl Execute for UnionExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
let streams = self.inputs.into_iter().map(|e| e.execute()).collect();
merge(streams).boxed()
merge(
streams,
self.metrics,
self.actor_context.fragment_id,
self.actor_context.id,
)
.boxed()
}
}

Expand All @@ -63,7 +81,12 @@ impl Stream for Input {

/// Merges input streams and aligns with barriers.
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn merge(inputs: Vec<BoxedMessageStream>) {
async fn merge(
inputs: Vec<BoxedMessageStream>,
metrics: Arc<StreamingMetrics>,
fragment_id: FragmentId,
actor_id: ActorId,
) {
let input_num = inputs.len();
let mut active: FuturesUnordered<_> = inputs
.into_iter()
Expand All @@ -82,6 +105,13 @@ async fn merge(inputs: Vec<BoxedMessageStream>) {
// watermark column index -> `BufferedWatermarks`
let mut watermark_buffers = BTreeMap::<usize, BufferedWatermarks<usize>>::new();

let mut start_time = Instant::now();
let barrier_align = metrics.barrier_align_duration.with_label_values(&[
&actor_id.to_string(),
&fragment_id.to_string(),
"",
"Union",
]);
loop {
match active.next().await {
Some((Some(Ok(message)), remaining)) => {
Expand All @@ -106,6 +136,9 @@ async fn merge(inputs: Vec<BoxedMessageStream>) {
}
Message::Barrier(barrier) => {
// Block this upstream by pushing it to `blocked`.
if blocked.is_empty() {
start_time = Instant::now();
}
blocked.push(remaining);
if let Some(cur_barrier) = current_barrier.as_ref() {
if barrier.epoch != cur_barrier.epoch {
Expand All @@ -131,6 +164,7 @@ async fn merge(inputs: Vec<BoxedMessageStream>) {
None => {
assert!(active.is_terminated());
let barrier = current_barrier.take().unwrap();
barrier_align.observe(start_time.elapsed().as_secs_f64());

let upstreams = std::mem::take(&mut blocked);
active.extend(upstreams.into_iter().map(|upstream| upstream.into_future()));
Expand Down Expand Up @@ -174,7 +208,7 @@ mod tests {
.boxed(),
];
let mut output = vec![];
let mut merged = merge(streams).boxed();
let mut merged = merge(streams, Arc::new(StreamingMetrics::unused()), 0, 0).boxed();

let result = vec![
Message::Chunk(StreamChunk::from_pretty("I\n + 1")),
Expand Down
6 changes: 5 additions & 1 deletion src/stream/src/from_proto/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ impl ExecutorBuilder for UnionExecutorBuilder {
_node: &Self::Node,
_store: impl StateStore,
) -> StreamResult<Executor> {
Ok((params.info, UnionExecutor::new(params.input)).into())
Ok((
params.info,
UnionExecutor::new(params.input, params.executor_stats, params.actor_context),
)
.into())
}
}

0 comments on commit 93646a5

Please sign in to comment.