diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 709215e69eaa0..73157f91c2d0c 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -81,18 +81,12 @@ pub struct Reschedule { pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>, } -/// Replacing an old table with a new one. Used for `ALTER TABLE` and sink into table. All actors in the table job will be rebuilt. #[derive(Debug, Clone)] pub struct ReplaceTablePlan { pub old_table_fragments: TableFragments, pub new_table_fragments: TableFragments, pub merge_updates: Vec, pub dispatchers: HashMap>, - /// For a table with connector, the `SourceExecutor` actor will also be rebuilt with new actor ids. - /// We need to reassign splits for it. - /// - /// Note that there's no `SourceBackfillExecutor` involved for table with connector, so we don't need to worry about - /// backfill_splits. pub init_split_assignment: SplitAssignment, } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 7caa42f90ca26..09e0488f6ff91 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -258,8 +258,6 @@ where W: WatermarkBufferStrategy, { /// Create state table from table catalog and store. - /// - /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used. pub async fn from_table_catalog( table_catalog: &Table, store: S, @@ -1358,6 +1356,9 @@ where /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`. /// `pk_prefix` is used to identify the exact vnode the scan should perform on. + + /// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same + /// `vnode`. pub async fn iter_with_prefix( &self, pk_prefix: impl Row, diff --git a/src/stream/src/executor/source/source_backfill_state_table.rs b/src/stream/src/executor/source/source_backfill_state_table.rs index be9abe8490e63..678a76f39f927 100644 --- a/src/stream/src/executor/source/source_backfill_state_table.rs +++ b/src/stream/src/executor/source/source_backfill_state_table.rs @@ -33,7 +33,6 @@ pub struct BackfillStateTableHandler { } impl BackfillStateTableHandler { - /// See also [`super::SourceStateTableHandler::from_table_catalog`] for how the state table looks like. pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self { Self { state_store: StateTable::from_table_catalog(table_catalog, store, None).await, diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 5f9bd2c9f2c55..f85f12c793a2b 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -51,21 +51,12 @@ pub struct SourceStateTableHandler { } impl SourceStateTableHandler { - /// Creates a state table with singleton distribution (only one vnode 0). - /// - /// `SourceExecutor` is distributed, but its state table is singleton. - /// This is because the `SourceExecutor`s states are not sharded by consistent hash. - /// The state table is `partition_id -> offset_info`, but the partitions (a.k.a split) each actor accesses - /// are assigned by `SourceManager` in meta, instead of `vnode` computed from the `partition_id`. - /// - /// See also `infer_internal_table_catalog` in `src/frontend/src/optimizer/plan_node/generic/source.rs`. pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self { Self { state_table: StateTable::from_table_catalog(table_catalog, store, None).await, } } - /// For [`super::FsFetchExecutor`], each actor accesses splits according to the `vnode` computed from `partition_id`. pub async fn from_table_catalog_with_vnodes( table_catalog: &PbTable, store: S, diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 53d9df2e183f6..ae60ebb0c726a 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -190,12 +190,11 @@ impl ExecutorBuilder for SourceExecutorBuilder { .map(|column| column.column_id) .collect(); - let mut state_table = source.state_table.clone().unwrap(); - // To make it possible to scan the whole state table. - // This is quite wild, can we do this? - state_table.read_prefix_len_hint = 0; - let state_table_handler = - SourceStateTableHandler::from_table_catalog(&state_table, store.clone()).await; + let state_table_handler = SourceStateTableHandler::from_table_catalog( + source.state_table.as_ref.unwrap(), + store.clone(), + ) + .await; let stream_source_core = StreamSourceCore::new( source_id, source_name,