diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 25003aff7f70..c533b1d413fb 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -158,6 +158,9 @@ struct ExecutionVars { chunk_builder: StreamChunkBuilder, buffer: SortBuffer, + + /// The stream is paused. + is_paused: bool, } #[derive(Debug, Default)] @@ -539,7 +542,7 @@ impl HashAggExecutor { Ok(()) } - async fn try_flush_data(this: &mut ExecutorInner) -> StreamExecutorResult<()> { + async fn try_flush_table(this: &mut ExecutorInner) -> StreamExecutorResult<()> { futures::future::try_join_all( this.all_state_tables_mut() .map(|table| async { table.try_flush().await }), @@ -589,6 +592,7 @@ impl HashAggExecutor { window_watermark: None, chunk_builder: StreamChunkBuilder::new(this.chunk_size, this.info.schema.data_types()), buffer: SortBuffer::new(window_col_idx_in_group_key, &this.intermediate_state_table), + is_paused: false, }; // TODO(rc): use something like a `ColumnMapping` type @@ -602,12 +606,12 @@ impl HashAggExecutor { // First barrier let mut input = input.execute(); - let barrier = expect_first_barrier(&mut input).await?; + let first_barrier = expect_first_barrier(&mut input).await?; this.all_state_tables_mut().for_each(|table| { - table.init_epoch(barrier.epoch); + table.init_epoch(first_barrier.epoch); }); - - yield Message::Barrier(barrier); + vars.is_paused = first_barrier.is_pause_on_startup(); + yield Message::Barrier(first_barrier); #[for_await] for msg in input { @@ -635,31 +639,34 @@ impl HashAggExecutor { } } - Self::try_flush_data(&mut this).await?; + Self::try_flush_table(&mut this).await?; } Message::Barrier(barrier) => { - #[for_await] - for chunk in Self::flush_data(&mut this, &mut vars) { - yield Message::Chunk(chunk?); - } - Self::flush_metrics(&this, &mut vars); - Self::commit_state_tables(&mut this, barrier.epoch).await?; - - if this.emit_on_window_close { - // ignore watermarks on other columns - if let Some(watermark) = - vars.buffered_watermarks[window_col_idx_in_group_key].take() - { - yield Message::Watermark(watermark); + if !vars.is_paused { + #[for_await] + for chunk in Self::flush_data(&mut this, &mut vars) { + yield Message::Chunk(chunk?); } - } else { - for buffered_watermark in &mut vars.buffered_watermarks { - if let Some(watermark) = buffered_watermark.take() { + + if this.emit_on_window_close { + // ignore watermarks on other columns + if let Some(watermark) = + vars.buffered_watermarks[window_col_idx_in_group_key].take() + { yield Message::Watermark(watermark); } + } else { + for buffered_watermark in &mut vars.buffered_watermarks { + if let Some(watermark) = buffered_watermark.take() { + yield Message::Watermark(watermark); + } + } } } + Self::flush_metrics(&this, &mut vars); + Self::commit_state_tables(&mut this, barrier.epoch).await?; + // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) { let previous_vnode_bitmap = this.intermediate_state_table.vnodes().clone(); @@ -669,6 +676,9 @@ impl HashAggExecutor { // Manipulate the cache if necessary. if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) { + // 1. If it's not paused, we should already flushed dirty groups in `flush_data`. + // 2. If it's paused, there should be no `Chunk` message to produce new dirty groups. + assert!(vars.dirty_groups.is_empty()); vars.agg_group_cache.clear(); vars.distinct_dedup.dedup_caches_mut().for_each(|cache| { cache.clear(); @@ -676,6 +686,18 @@ impl HashAggExecutor { } } + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => { + vars.is_paused = true; + } + Mutation::Resume => { + vars.is_paused = false; + } + _ => (), + } + } + yield Message::Barrier(barrier); } }