From e5e1d5452535cbdf8301194229e36018a3f7619c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 27 Mar 2024 01:01:40 +0800 Subject: [PATCH 1/3] fix --- src/common/src/session_config/mod.rs | 2 +- src/frontend/src/optimizer/plan_node/stream_table_scan.rs | 7 +++++++ src/stream/src/common/table/state_table.rs | 5 +++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 066076681ffa6..887f70746559f 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -140,7 +140,7 @@ pub struct ConfigMap { streaming_enable_bushy_join: bool, /// Enable arrangement backfill for streaming queries. Defaults to false. - #[parameter(default = false)] + #[parameter(default = true)] streaming_use_arrangement_backfill: bool, /// Allow `jsonb` in stream key diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 0f8f19a4f1325..8500723c443b2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -67,6 +67,13 @@ impl StreamTableScan { None => Distribution::SomeShard, } }; + // For single distribution, we can't use arrangement backfill. + let stream_scan_type = match stream_scan_type { + StreamScanType::ArrangementBackfill if distribution == Distribution::Single => { + StreamScanType::Backfill + } + _ => stream_scan_type, + }; let base = PlanBase::new_stream_with_core( &core, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 8e19e35fd5bab..2071f8e747321 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -330,6 +330,11 @@ where pk_indices.iter().position(|&i| vnode_col_idx == i) }); + // Upstream must have dist_key_in_pk_indices, otherwise that means it is singleton distribution, + // and it should use `no_shuffle_backfill` instead. + if IS_REPLICATED { + assert!(!dist_key_in_pk_indices.is_empty()); + } let distribution = TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk); From 07d178e5ea4787708446da29fcc8da8ebc6db7e9 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 27 Mar 2024 01:28:16 +0800 Subject: [PATCH 2/3] fix --- src/stream/src/common/table/state_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 2071f8e747321..c4629224650e5 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -332,7 +332,7 @@ where // Upstream must have dist_key_in_pk_indices, otherwise that means it is singleton distribution, // and it should use `no_shuffle_backfill` instead. - if IS_REPLICATED { + if IS_REPLICATED && is_consistent_op { assert!(!dist_key_in_pk_indices.is_empty()); } let distribution = From ba7ed0b7f4ff2391d81a28174e412886ec8bf263 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 27 Mar 2024 02:55:33 +0800 Subject: [PATCH 3/3] revert session config --- src/common/src/session_config/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 887f70746559f..066076681ffa6 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -140,7 +140,7 @@ pub struct ConfigMap { streaming_enable_bushy_join: bool, /// Enable arrangement backfill for streaming queries. Defaults to false. - #[parameter(default = true)] + #[parameter(default = false)] streaming_use_arrangement_backfill: bool, /// Allow `jsonb` in stream key