diff --git a/src/stream/src/executor/global_simple_agg.rs b/src/stream/src/executor/global_simple_agg.rs index eb8835f5d75ee..5bdcc14996924 100644 --- a/src/stream/src/executor/global_simple_agg.rs +++ b/src/stream/src/executor/global_simple_agg.rs @@ -80,6 +80,18 @@ struct ExecutorInner { extreme_cache_size: usize, } +impl ExecutorInner { + fn all_state_tables_mut(&mut self) -> impl Iterator> { + iter_table_storage(&mut self.storages) + .chain(self.distinct_dedup_tables.values_mut()) + .chain(std::iter::once(&mut self.result_table)) + } + + fn all_state_tables_except_result_mut(&mut self) -> impl Iterator> { + iter_table_storage(&mut self.storages).chain(self.distinct_dedup_tables.values_mut()) + } +} + struct ExecutionVars { /// The single [`AggGroup`]. agg_group: AggGroup, @@ -215,9 +227,8 @@ impl GlobalSimpleAggExecutor { // Commit all state tables except for result table. futures::future::try_join_all( - iter_table_storage(&mut this.storages) - .chain(this.distinct_dedup_tables.values_mut()) - .map(|state_table| state_table.commit(epoch)), + this.all_state_tables_except_result_mut() + .map(|table| table.commit(epoch)), ) .await?; @@ -263,12 +274,9 @@ impl GlobalSimpleAggExecutor { } else { // No state is changed. // Call commit on state table to increment the epoch. - iter_table_storage(&mut this.storages) - .chain(this.distinct_dedup_tables.values_mut()) - .for_each(|state_table| { - state_table.commit_no_data_expected(epoch); - }); - this.result_table.commit_no_data_expected(epoch); + this.all_state_tables_mut().for_each(|table| { + table.commit_no_data_expected(epoch); + }); Ok(None) } } @@ -282,12 +290,9 @@ impl GlobalSimpleAggExecutor { let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - iter_table_storage(&mut this.storages) - .chain(this.distinct_dedup_tables.values_mut()) - .for_each(|state_table| { - state_table.init_epoch(barrier.epoch); - }); - this.result_table.init_epoch(barrier.epoch); + this.all_state_tables_mut().for_each(|table| { + table.init_epoch(barrier.epoch); + }); let mut vars = ExecutionVars { // Create `AggGroup`. This will fetch previous agg result from the result table. diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 2bb586abfba41..582fd734f1e98 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -108,6 +108,14 @@ struct ExecutorInner { metrics: Arc, } +impl ExecutorInner { + fn all_state_tables_mut(&mut self) -> impl Iterator> { + iter_table_storage(&mut self.storages) + .chain(self.distinct_dedup_tables.values_mut()) + .chain(std::iter::once(&mut self.result_table)) + } +} + struct ExecutionVars { stats: ExecutionStats, @@ -484,39 +492,25 @@ impl HashAggExecutor { vars.distinct_dedup.flush(&mut this.distinct_dedup_tables)?; // Commit all state tables. - futures::future::try_join_all( - iter_table_storage(&mut this.storages) - .chain(this.distinct_dedup_tables.values_mut()) - .map(|state_table| async { - if let Some(watermark) = state_clean_watermark.as_ref() { - state_table.update_watermark(watermark.clone()) - }; - state_table.commit(epoch).await - }), - ) + futures::future::try_join_all(this.all_state_tables_mut().map(|table| async { + if let Some(watermark) = state_clean_watermark.as_ref() { + table.update_watermark(watermark.clone()) + }; + table.commit(epoch).await + })) .await?; - if let Some(watermark) = state_clean_watermark.as_ref() { - this.result_table.update_watermark(watermark.clone()); - }; - this.result_table.commit(epoch).await?; // Evict cache to target capacity. vars.agg_group_cache.evict(); } else { // Nothing to flush. // Call commit on state table to increment the epoch. - iter_table_storage(&mut this.storages) - .chain(this.distinct_dedup_tables.values_mut()) - .for_each(|state_table| { - if let Some(watermark) = state_clean_watermark.as_ref() { - state_table.update_watermark(watermark.clone()) - }; - state_table.commit_no_data_expected(epoch); - }); - if let Some(watermark) = state_clean_watermark.as_ref() { - this.result_table.update_watermark(watermark.clone()); - }; - this.result_table.commit_no_data_expected(epoch); + this.all_state_tables_mut().for_each(|table| { + if let Some(watermark) = state_clean_watermark.as_ref() { + table.update_watermark(watermark.clone()) + }; + table.commit_no_data_expected(epoch); + }); return Ok(()); } } @@ -551,12 +545,9 @@ impl HashAggExecutor { // First barrier let mut input = input.execute(); let barrier = expect_first_barrier(&mut input).await?; - iter_table_storage(&mut this.storages) - .chain(this.distinct_dedup_tables.values_mut()) - .for_each(|state_table| { - state_table.init_epoch(barrier.epoch); - }); - this.result_table.init_epoch(barrier.epoch); + this.all_state_tables_mut().for_each(|table| { + table.init_epoch(barrier.epoch); + }); vars.agg_group_cache.update_epoch(barrier.epoch.curr); vars.distinct_dedup.dedup_caches_mut().for_each(|cache| { cache.update_epoch(barrier.epoch.curr); @@ -593,13 +584,10 @@ impl HashAggExecutor { // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) { - iter_table_storage(&mut this.storages) - .chain(this.distinct_dedup_tables.values_mut()) - .for_each(|state_table| { - let _ = state_table.update_vnode_bitmap(vnode_bitmap.clone()); - }); - let previous_vnode_bitmap = - this.result_table.update_vnode_bitmap(vnode_bitmap.clone()); + let previous_vnode_bitmap = this.result_table.vnodes().clone(); + this.all_state_tables_mut().for_each(|table| { + let _ = table.update_vnode_bitmap(vnode_bitmap.clone()); + }); // Manipulate the cache if necessary. if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {