diff --git a/src/stream/src/executor/source/state_table_handler.rs b/src/stream/src/executor/source/state_table_handler.rs index 74a163b01e1b4..ec7139b975ade 100644 --- a/src/stream/src/executor/source/state_table_handler.rs +++ b/src/stream/src/executor/source/state_table_handler.rs @@ -52,13 +52,21 @@ pub struct SourceStateTableHandler { } impl SourceStateTableHandler { - /// Creates a state table with singleton distribution. + /// Creates a state table with singleton distribution (only one vnode 0). + /// + /// `SourceExecutor` is distributed, but its state table is singleton. + /// This is because the `SourceExecutor`s states are not sharded by consistent hash. + /// The state table is `partition_id -> offset_info`, but the partitions (a.k.a split) each actor accesses + /// are assigned by `SourceManager` in meta, instead of `vnode` computed from the `partition_id`. + /// + /// See also `infer_internal_table_catalog` in `src/frontend/src/optimizer/plan_node/generic/source.rs`. pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self { Self { state_table: StateTable::from_table_catalog(table_catalog, store, None).await, } } + /// For [`super::FsFetchExecutor`], each actor accesses splits according to the `vnode` computed from `partition_id`. pub async fn from_table_catalog_with_vnodes( table_catalog: &PbTable, store: S,