diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index a93112ba71089..63c54b5cfe599 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -77,6 +77,7 @@ impl Execute for WatermarkFilterExecutor { self.execute_inner().boxed() } } +const UPDATE_GLIOBAL_WATERMARK_FREQUENCY: usize = 10; impl WatermarkFilterExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] @@ -124,7 +125,7 @@ impl WatermarkFilterExecutor { // If the input is idle let mut idle_input = true; - + let mut barrier_num_during_idle = 0; #[for_await] for msg in input { let msg = msg?; @@ -255,35 +256,42 @@ impl WatermarkFilterExecutor { HummockReadEpoch::Committed(prev_epoch), ) .await?; - } else if is_checkpoint { - if idle_input { - // Align watermark - // NOTE(st1page): Should be `NoWait` because it could lead to a degradation of concurrent checkpoint situations, as it would require waiting for the previous epoch - let global_max_watermark = Self::get_global_max_watermark( - &table, - &global_watermark_table, - HummockReadEpoch::NoWait(prev_epoch), - ) - .await?; + } - current_watermark = if let Some(global_max_watermark) = - global_max_watermark.clone() - && let Some(watermark) = current_watermark.clone() - { - Some(cmp::max_by( - watermark, - global_max_watermark, - DefaultOrd::default_cmp, - )) - } else { - current_watermark.or(global_max_watermark) - }; - if let Some(watermark) = current_watermark.clone() { - yield Message::Watermark(Watermark::new( - event_time_col_idx, - watermark_type.clone(), - watermark, - )); + if is_checkpoint { + if idle_input { + barrier_num_during_idle += 1; + + if barrier_num_during_idle == UPDATE_GLIOBAL_WATERMARK_FREQUENCY { + barrier_num_during_idle = 0; + // Align watermark + // NOTE(st1page): Should be `NoWait` because it could lead to a degradation of concurrent checkpoint situations, as it would require waiting for the previous epoch + let global_max_watermark = Self::get_global_max_watermark( + &table, + &global_watermark_table, + HummockReadEpoch::NoWait(prev_epoch), + ) + .await?; + + current_watermark = if let Some(global_max_watermark) = + global_max_watermark.clone() + && let Some(watermark) = current_watermark.clone() + { + Some(cmp::max_by( + watermark, + global_max_watermark, + DefaultOrd::default_cmp, + )) + } else { + current_watermark.or(global_max_watermark) + }; + if let Some(watermark) = current_watermark.clone() { + yield Message::Watermark(Watermark::new( + event_time_col_idx, + watermark_type.clone(), + watermark, + )); + } } } else { idle_input = true;