diff --git a/e2e_test/streaming/watermark.slt b/e2e_test/streaming/watermark.slt index 5d8f189dfd965..a7bc67066fd1e 100644 --- a/e2e_test/streaming/watermark.slt +++ b/e2e_test/streaming/watermark.slt @@ -21,7 +21,7 @@ statement ok insert into t values ('2023-05-06 16:56:01', 1); skipif in-memory -sleep 10s +sleep 20s skipif in-memory query TI diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 2aca1251dd058..cf05464917674 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -77,6 +77,7 @@ impl Execute for WatermarkFilterExecutor { self.execute_inner().boxed() } } +const UPDATE_GLOBAL_WATERMARK_FREQUENCY_WHEN_IDLE: usize = 5; impl WatermarkFilterExecutor { #[try_stream(ok = Message, error = StreamExecutorError)] @@ -99,13 +100,18 @@ impl WatermarkFilterExecutor { let mut input = input.execute(); let first_barrier = expect_first_barrier(&mut input).await?; + let prev_epoch = first_barrier.epoch.prev; table.init_epoch(first_barrier.epoch); // The first barrier message should be propagated. yield Message::Barrier(first_barrier); // Initiate and yield the first watermark. - let mut current_watermark = - Self::get_global_max_watermark(&table, &global_watermark_table).await?; + let mut current_watermark = Self::get_global_max_watermark( + &table, + &global_watermark_table, + HummockReadEpoch::Committed(prev_epoch), + ) + .await?; let mut last_checkpoint_watermark = None; @@ -119,7 +125,7 @@ impl WatermarkFilterExecutor { // If the input is idle let mut idle_input = true; - + let mut barrier_num_during_idle = 0; #[for_await] for msg in input { let msg = msg?; @@ -208,6 +214,9 @@ impl WatermarkFilterExecutor { } } Message::Barrier(barrier) => { + let prev_epoch = barrier.epoch.prev; + let is_checkpoint = barrier.kind.is_checkpoint(); + let mut need_update_global_max_watermark = false; // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) { let other_vnodes_bitmap = Arc::new( @@ -220,15 +229,11 @@ impl WatermarkFilterExecutor { // Take the global max watermark when scaling happens. if previous_vnode_bitmap != vnode_bitmap { - current_watermark = - Self::get_global_max_watermark(&table, &global_watermark_table) - .await?; + need_update_global_max_watermark = true; } } - if barrier.kind.is_checkpoint() - && last_checkpoint_watermark != current_watermark - { + if is_checkpoint && last_checkpoint_watermark != current_watermark { last_checkpoint_watermark.clone_from(¤t_watermark); // Persist the watermark when checkpoint arrives. if let Some(watermark) = current_watermark.clone() { @@ -242,39 +247,59 @@ impl WatermarkFilterExecutor { } table.commit(barrier.epoch).await?; + yield Message::Barrier(barrier); + + if need_update_global_max_watermark { + current_watermark = Self::get_global_max_watermark( + &table, + &global_watermark_table, + HummockReadEpoch::Committed(prev_epoch), + ) + .await?; + } - if barrier.kind.is_checkpoint() { + if is_checkpoint { if idle_input { - // Align watermark - let global_max_watermark = - Self::get_global_max_watermark(&table, &global_watermark_table) - .await?; - - current_watermark = if let Some(global_max_watermark) = - global_max_watermark.clone() - && let Some(watermark) = current_watermark.clone() + barrier_num_during_idle += 1; + + if barrier_num_during_idle + == UPDATE_GLOBAL_WATERMARK_FREQUENCY_WHEN_IDLE { - Some(cmp::max_by( - watermark, - global_max_watermark, - DefaultOrd::default_cmp, - )) - } else { - current_watermark.or(global_max_watermark) - }; - if let Some(watermark) = current_watermark.clone() { - yield Message::Watermark(Watermark::new( - event_time_col_idx, - watermark_type.clone(), - watermark, - )); + barrier_num_during_idle = 0; + // Align watermark + // NOTE(st1page): Should be `NoWait` because it could lead to a degradation of concurrent checkpoint situations, as it would require waiting for the previous epoch + let global_max_watermark = Self::get_global_max_watermark( + &table, + &global_watermark_table, + HummockReadEpoch::NoWait(prev_epoch), + ) + .await?; + + current_watermark = if let Some(global_max_watermark) = + global_max_watermark.clone() + && let Some(watermark) = current_watermark.clone() + { + Some(cmp::max_by( + watermark, + global_max_watermark, + DefaultOrd::default_cmp, + )) + } else { + current_watermark.or(global_max_watermark) + }; + if let Some(watermark) = current_watermark.clone() { + yield Message::Watermark(Watermark::new( + event_time_col_idx, + watermark_type.clone(), + watermark, + )); + } } } else { idle_input = true; + barrier_num_during_idle = 0; } } - - yield Message::Barrier(barrier); } } } @@ -301,8 +326,8 @@ impl WatermarkFilterExecutor { async fn get_global_max_watermark( table: &StateTable, global_watermark_table: &StorageTable, + wait_epoch: HummockReadEpoch, ) -> StreamExecutorResult> { - let epoch = table.epoch(); let handle_watermark_row = |watermark_row: Option| match watermark_row { Some(row) => { if row.len() == 1 { @@ -319,9 +344,8 @@ impl WatermarkFilterExecutor { .iter_vnodes() .map(|vnode| async move { let pk = row::once(vnode.to_datum()); - let watermark_row: Option = global_watermark_table - .get_row(pk, HummockReadEpoch::NoWait(epoch)) - .await?; + let watermark_row: Option = + global_watermark_table.get_row(pk, wait_epoch).await?; handle_watermark_row(watermark_row) }); let local_watermark_iter_futures = table.vnodes().iter_vnodes().map(|vnode| async move {