diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index b0dd649c59d1a..08312c0c882dc 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1127,24 +1127,6 @@ where Ok(()) } - // TODO(st1page): maybe we should extract a pub struct to do it - /// just specially used by those state table read-only and after the call the data - /// in the epoch will be visible - pub fn commit_no_data_expected(&mut self, new_epoch: EpochPair) { - assert_eq!(self.epoch(), new_epoch.prev); - assert!(!self.is_dirty()); - // Tick the watermark buffer here because state table is expected to be committed once - // per epoch. - self.watermark_buffer_strategy.tick(); - self.local_store.seal_current_epoch( - new_epoch.curr, - SealCurrentEpochOptions { - table_watermarks: None, - switch_op_consistency_level: None, - }, - ); - } - /// Write to state store. async fn seal_current_epoch( &mut self, diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index 76f9271862e29..fa39c87d74ea6 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -374,7 +374,6 @@ where } // consume upstream buffer chunk - let upstream_chunk_buffer_is_empty = upstream_chunk_buffer.is_empty(); for chunk in upstream_chunk_buffer.drain(..) { cur_barrier_upstream_processed_rows += chunk.cardinality() as u64; // FIXME: Replace with `snapshot_is_processed` @@ -397,11 +396,7 @@ where upstream_table.write_chunk(chunk); } - if upstream_chunk_buffer_is_empty { - upstream_table.commit_no_data_expected(barrier.epoch) - } else { - upstream_table.commit(barrier.epoch).await?; - } + upstream_table.commit(barrier.epoch).await?; self.metrics .arrangement_backfill_snapshot_read_row_count diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 8937d52607748..8fc27ab5864ae 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -555,10 +555,7 @@ pub(crate) async fn flush_data( ) -> StreamExecutorResult<()> { let vnodes = table.vnodes().clone(); if let Some(old_state) = old_state { - if old_state[1..] == current_partial_state[1..] { - table.commit_no_data_expected(epoch); - return Ok(()); - } else { + if old_state[1..] != current_partial_state[1..] { vnodes.iter_vnodes_scalar().for_each(|vnode| { let datum = Some(vnode.into()); current_partial_state[0] = datum.clone(); @@ -727,7 +724,6 @@ pub(crate) async fn persist_state_per_vnode, ) -> StreamExecutorResult<()> { - let mut has_progress = false; for vnode in vnodes { if !backfill_state.need_commit(&vnode) { continue; @@ -762,7 +758,6 @@ pub(crate) async fn persist_state_per_vnode( flush_data(table, epoch, old_state, current_state).await?; *old_state = Some(current_state.into()); } else { - table.commit_no_data_expected(epoch); + table.commit(epoch).await?; } Ok(()) } diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index b2d17e9da638f..6049b963359ab 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -76,8 +76,6 @@ impl AppendOnlyDedupExecutor { // The first barrier message should be propagated. yield Message::Barrier(barrier); - let mut commit_data = false; - #[for_await] for msg in input { self.cache.evict(); @@ -127,21 +125,13 @@ impl AppendOnlyDedupExecutor { let chunk = StreamChunk::with_visibility(ops, columns, vis); self.state_table.write_chunk(chunk.clone()); - commit_data = true; - yield Message::Chunk(chunk); } self.state_table.try_flush().await?; } Message::Barrier(barrier) => { - if commit_data { - // Only commit when we have new data in this epoch. - self.state_table.commit(barrier.epoch).await?; - commit_data = false; - } else { - self.state_table.commit_no_data_expected(barrier.epoch); - } + self.state_table.commit(barrier.epoch).await?; if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(self.ctx.id) { let (_prev_vnode_bitmap, cache_may_stale) = diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 14cf7192bd4db..d3299d99e8a33 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -462,17 +462,13 @@ impl DynamicFilterExecutor NowExecutor { last_timestamp = state_row.and_then(|row| row[0].clone()); paused = barrier.is_pause_on_startup(); initialized = true; - } else if paused { - // Assert that no data is updated. - state_table.commit_no_data_expected(barrier.epoch); } else { state_table.commit(barrier.epoch).await?; } diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index f957241a402c9..b8915a070dbc5 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -214,12 +214,6 @@ impl SimpleAggExecutor { this.intermediate_state_table .update_without_old_value(encoded_states); - // Commit all state tables. - futures::future::try_join_all( - this.all_state_tables_mut().map(|table| table.commit(epoch)), - ) - .await?; - // Retrieve modified states and put the changes into the builders. vars.agg_group .build_change(&this.storages, &this.agg_funcs) @@ -227,13 +221,13 @@ impl SimpleAggExecutor { .map(|change| change.to_stream_chunk(&this.info.schema.data_types())) } else { // No state is changed. - // Call commit on state table to increment the epoch. - this.all_state_tables_mut().for_each(|table| { - table.commit_no_data_expected(epoch); - }); None }; + // Commit all state tables. + futures::future::try_join_all(this.all_state_tables_mut().map(|table| table.commit(epoch))) + .await?; + vars.state_changed = false; Ok(chunk) } @@ -241,7 +235,6 @@ impl SimpleAggExecutor { async fn try_flush_data(this: &mut ExecutorInner) -> StreamExecutorResult<()> { futures::future::try_join_all(this.all_state_tables_mut().map(|table| table.try_flush())) .await?; - Ok(()) } diff --git a/src/stream/src/executor/sort.rs b/src/stream/src/executor/sort.rs index 786eabdeabbf5..f07967ce45299 100644 --- a/src/stream/src/executor/sort.rs +++ b/src/stream/src/executor/sort.rs @@ -53,7 +53,6 @@ struct ExecutorInner { struct ExecutionVars { buffer: SortBuffer, - buffer_changed: bool, } impl Executor for SortExecutor { @@ -103,7 +102,6 @@ impl SortExecutor { let mut vars = ExecutionVars { buffer: SortBuffer::new(this.sort_column_index, &this.buffer_table), - buffer_changed: false, }; // Populate the sort buffer cache on initialization. @@ -131,7 +129,6 @@ impl SortExecutor { if let Some(chunk) = chunk_builder.take() { yield Message::Chunk(chunk); } - vars.buffer_changed = true; yield Message::Watermark(watermark); } @@ -141,16 +138,10 @@ impl SortExecutor { } Message::Chunk(chunk) => { vars.buffer.apply_chunk(chunk, &mut this.buffer_table); - vars.buffer_changed = true; this.buffer_table.try_flush().await?; } Message::Barrier(barrier) => { - if vars.buffer_changed { - this.buffer_table.commit(barrier.epoch).await?; - } else { - this.buffer_table.commit_no_data_expected(barrier.epoch); - } - vars.buffer_changed = false; + this.buffer_table.commit(barrier.epoch).await?; // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) { diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 43a6ba3add1ef..6e9c3a49856de 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -269,11 +269,10 @@ impl WatermarkFilterExecutor { table.insert(row); } } - table.commit(barrier.epoch).await?; - } else { - table.commit_no_data_expected(barrier.epoch); } + table.commit(barrier.epoch).await?; + if barrier.kind.is_checkpoint() { if idle_input { // Align watermark