diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 4b190b0bdcfa2..aadbccc1b97f6 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1293,22 +1293,6 @@ where } } -impl< - S, - SD, - const IS_REPLICATED: bool, - W: WatermarkBufferStrategy, - const USE_WATERMARK_CACHE: bool, - > StateTableInner -where - S: StateStore, - SD: ValueRowSerde, -{ - pub fn get_schema_len(&self) -> usize { - self.pk_indices.len() + self.value_indices.as_ref().map_or(0, |v| v.len()) - } -} - pub type KeyedRowStream<'a, S: StateStore, SD: ValueRowSerde + 'a> = impl Stream>> + 'a; diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 6ee2f48804ba9..524b24afed509 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -131,14 +131,6 @@ where // +1 for vnode, +1 for backfill_finished, +1 for row_count. let state_len = pk_in_output_indices.len() + 3; - // Handle backwards compatibility of old schema: - // | vnode | pk ... | backfill_finished | - let row_count_is_persisted = if let Some(state_table) = self.state_table.as_ref() { - state_table.get_schema_len() == state_len - } else { - false - }; - let pk_order = self.upstream_table.pk_serializer().get_order_types(); let upstream_table_id = self.upstream_table.table_id().table_id; @@ -153,7 +145,7 @@ where } let is_finished = if let Some(state_table) = self.state_table.as_ref() { - let is_finished = check_all_vnode_finished(state_table, row_count_is_persisted).await?; + let is_finished = check_all_vnode_finished(state_table, state_len).await?; if is_finished { assert!(!first_barrier.is_newly_added(self.actor_id)); } @@ -228,8 +220,8 @@ where // Keep track of rows from the snapshot. let mut total_snapshot_processed_rows: u64 = - if let Some(state_table) = self.state_table.as_ref() && row_count_is_persisted { - get_row_count_state(state_table).await? + if let Some(state_table) = self.state_table.as_ref() { + get_row_count_state(state_table, state_len).await? } else { // Maintain backwards compatibility with no state_table. 0 diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 93b95209a960f..03f107f32fbee 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -308,19 +308,21 @@ pub(crate) async fn get_progress_per_vnode Option { - let datum = if row_count_is_persisted { - // Row count is the last datum in the schema. - // backfill is second last. - row.datum_at(row.len() - 2) - } else { - // handle backwards compat case where there isn't a row count persisted. +pub(crate) fn get_backfill_finished(row: OwnedRow, state_len: usize) -> Option { + let datum = if row.len() < state_len { + // Handle backwards compatibility case where + // we did not have row count. row.last() + } else { + row.datum_at(row.len() - 2) }; datum.map(|d| d.into_bool()) } -pub(crate) fn get_row_count(row: OwnedRow) -> u64 { +pub(crate) fn get_row_count(row: OwnedRow, state_len: usize) -> u64 { + if row.len() == state_len - 1 { + return 0; + } match row.last() { None => 0, Some(d) => d.into_int64() as u64, @@ -329,6 +331,7 @@ pub(crate) fn get_row_count(row: OwnedRow) -> u64 { pub(crate) async fn get_row_count_state( state_table: &StateTableInner, + state_len: usize, ) -> StreamExecutorResult { let mut vnodes = state_table.vnodes().iter_vnodes_scalar(); let vnode = vnodes.next().unwrap(); @@ -336,7 +339,7 @@ pub(crate) async fn get_row_count_state 0, - Some(row) => get_row_count(row), + Some(row) => get_row_count(row, state_len), }; Ok(row_count) } @@ -344,7 +347,7 @@ pub(crate) async fn get_row_count_state( state_table: &StateTableInner, - row_count_is_persisted: bool, + state_len: usize, ) -> StreamExecutorResult { debug_assert!(!state_table.vnode_bitmap().is_empty()); let vnodes = state_table.vnodes().iter_vnodes_scalar(); @@ -354,7 +357,7 @@ pub(crate) async fn check_all_vnode_finished