diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 0f8f19a4f1325..8500723c443b2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -67,6 +67,13 @@ impl StreamTableScan { None => Distribution::SomeShard, } }; + // For single distribution, we can't use arrangement backfill. + let stream_scan_type = match stream_scan_type { + StreamScanType::ArrangementBackfill if distribution == Distribution::Single => { + StreamScanType::Backfill + } + _ => stream_scan_type, + }; let base = PlanBase::new_stream_with_core( &core, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 8e19e35fd5bab..c4629224650e5 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -330,6 +330,11 @@ where pk_indices.iter().position(|&i| vnode_col_idx == i) }); + // Upstream must have dist_key_in_pk_indices, otherwise that means it is singleton distribution, + // and it should use `no_shuffle_backfill` instead. + if IS_REPLICATED && is_consistent_op { + assert!(!dist_key_in_pk_indices.is_empty()); + } let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);