From b6550f8710c3c2f8959a89f7d0be80ca5ae03031 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 15 Jan 2024 12:32:46 +0800 Subject: [PATCH 01/10] enable arrangement backfill --- src/common/src/session_config/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index daef5faf1e240..3d7af60be6ec5 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -135,7 +135,7 @@ pub struct ConfigMap { streaming_enable_bushy_join: bool, /// Enable arrangement backfill for streaming queries. Defaults to false. - #[parameter(default = false)] + #[parameter(default = true)] streaming_enable_arrangement_backfill: bool, /// Allow `jsonb` in stream key From 7b56f3d229cd79ddb9064b579f20a9c98b673bff Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 15 Jan 2024 13:27:36 +0800 Subject: [PATCH 02/10] test --- src/tests/simulation/src/cluster.rs | 4 ++-- .../tests/integration_tests/scale/auto_parallelism.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 9b3e1a7ad7369..7997638c7e347 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -170,8 +170,8 @@ metrics_level = "Disabled" meta_nodes: 1, compactor_nodes: 1, compute_node_cores: 2, - per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()] - .into(), + // per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()] + // .into(), ..Default::default() } } diff --git a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs index 242d699c8259d..a92e8872afde9 100644 --- a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs @@ -593,7 +593,7 @@ async fn test_compatibility_with_low_level() -> Result<()> { let hash_join_fragment_id = hash_join_fragment.id(); - // manual scale in m_simple materialize fragment + // manual scale in m_join materialize fragment cluster .reschedule_resolve_no_shuffle(format!( "{hash_join_fragment_id}-[{chosen_parallel_unit_a}]" From 0767fb02f0205a0dd2553964d80fe00e273f958b Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 15 Jan 2024 18:16:16 +0800 Subject: [PATCH 03/10] docs --- src/meta/src/model/stream.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 139581d46f43f..83c93eb807036 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -41,10 +41,20 @@ use crate::stream::{build_actor_connector_splits, build_actor_split_impls, Split /// Column family name for table fragments. const TABLE_FRAGMENTS_CF_NAME: &str = "cf/table_fragments"; +/// The parallelism for a `TableFragments`. #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum TableParallelism { + /// This is when the system decides the parallelism, based on the available parallel units. Auto, + /// We set this when the `TableFragments` parallelism is changed. + /// All fragments which are part of the `TableFragment` will have the same parallelism as this. Fixed(usize), + /// We set this when the individual parallelisms of the `Fragments` + /// can differ within a `TableFragments`. + /// This is set for `risectl`, since it has a low-level interface, + /// scale individual `Fragments` within `TableFragments`. + /// When that happens, the `TableFragments` no longer has a consistent + /// parallelism, so we set this to indicate that. Custom, } From 9e0b66815ebb07d23948245a7adb197ebdafd898 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 15 Jan 2024 19:09:17 +0800 Subject: [PATCH 04/10] add arrangement_backfill for auto_parallelism --- src/tests/simulation/src/cluster.rs | 8 +- .../scale/auto_parallelism.rs | 117 +++++++++++++++++- 2 files changed, 119 insertions(+), 6 deletions(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 7997638c7e347..f78548044a6ad 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -170,8 +170,12 @@ metrics_level = "Disabled" meta_nodes: 1, compactor_nodes: 1, compute_node_cores: 2, - // per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()] - // .into(), + per_session_queries: vec![ + "SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into(), + "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(), + "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(), + ] + .into(), ..Default::default() } } diff --git a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs index a92e8872afde9..67b0283641b2f 100644 --- a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs @@ -505,10 +505,6 @@ async fn test_compatibility_with_low_level() -> Result<()> { )) .await; - // helper views - session.run("create view table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;").await?; - session.run("create view mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;").await?; - session.run("create table t(v int);").await?; // single fragment downstream @@ -626,3 +622,116 @@ async fn test_compatibility_with_low_level() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<()> { + let config = Configuration::for_auto_parallelism( + MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, + false, + true, + ); + let mut cluster = Cluster::start(config.clone()).await?; + let mut session = cluster.start_session(); + + // Keep one worker reserved for adding later. + let select_worker = "compute-2"; + cluster + .simple_kill_nodes(vec![select_worker.to_string()]) + .await; + + sleep(Duration::from_secs( + MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2, + )) + .await; + + session.run("create table t(v int);").await?; + + // Streaming arrangement backfill + session.run("SET streaming_enable_arrangement_backfill = true;").await?; + session + .run("create materialized view m_simple as select * from t;") + .await?; + + session + .run("select parallelism from table_parallelism") + .await? + .assert_result_eq("AUTO"); + + // Find the table materialize fragment + let table_mat_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("materialize"), + identity_contains("union"), + ]) + .await?; + + let (mut all_parallel_units, _) = table_mat_fragment.parallel_unit_usage(); + + let chosen_parallel_unit_a = all_parallel_units.pop().unwrap(); + let chosen_parallel_unit_b = all_parallel_units.pop().unwrap(); + + let table_mat_fragment_id = table_mat_fragment.id(); + + // manual scale in table materialize fragment + cluster + .reschedule(format!( + "{table_mat_fragment_id}-[{chosen_parallel_unit_a}]", + )) + .await?; + + session + .run("select parallelism from table_parallelism") + .await? + .assert_result_eq("CUSTOM"); + + // Upstream changes should not affect downstream. + session + .run("select parallelism from mview_parallelism where name = 'm_simple'") + .await? + .assert_result_eq("AUTO"); + + // Find the table fragment for materialized view + let simple_mv_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("materialize"), + identity_contains("StreamTableScan"), + ]) + .await?; + + let simple_mv_fragment_id = simple_mv_fragment.id(); + + // manual scale in m_simple materialize fragment + cluster + .reschedule_resolve_no_shuffle(format!( + "{simple_mv_fragment_id}-[{chosen_parallel_unit_b}]", + )) + .await?; + + // The downstream table fragment should be separate from the upstream table fragment. + session + .run("select parallelism from mview_parallelism where name = 'm_simple'") + .await? + .assert_result_eq("CUSTOM"); + + let before_fragment_parallelism = session + .run("select fragment_id, parallelism from rw_fragments order by fragment_id;") + .await?; + + cluster + .simple_restart_nodes(vec![select_worker.to_string()]) + .await; + + sleep(Duration::from_secs( + MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2, + )) + .await; + + let after_fragment_parallelism = session + .run("select fragment_id, parallelism from rw_fragments order by fragment_id;") + .await?; + + assert_eq!(before_fragment_parallelism, after_fragment_parallelism); + + Ok(()) +} + From b16843221f94e237bdd983aa4aee64095df20d8c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 15 Jan 2024 20:02:13 +0800 Subject: [PATCH 05/10] check if arrangement backfill is enabled, fix cascade_materialized_view test --- .../scale/cascade_materialized_view.rs | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs index 6e1c1333007d3..3c96f4c35766c 100644 --- a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs +++ b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs @@ -32,6 +32,8 @@ const MV5: &str = "create materialized view m5 as select * from m4;"; async fn test_simple_cascade_materialized_view() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); + let arrangement_backfill_is_enabled = session.run("show streaming_enable_arrangement_backfill").await?; + let arrangement_backfill_is_enabled = arrangement_backfill_is_enabled == "true"; session.run(ROOT_TABLE_CREATE).await?; session.run(MV1).await?; @@ -57,10 +59,25 @@ async fn test_simple_cascade_materialized_view() -> Result<()> { .locate_one_fragment([identity_contains("StreamTableScan")]) .await?; - assert_eq!( - chain_fragment.inner.actors.len(), - fragment.inner.actors.len() - ); + if arrangement_backfill_is_enabled { + // The chain fragment is in a different table fragment. + assert_eq!( + chain_fragment.inner.actors.len(), + 6, + ); + // The upstream materialized fragment should be scaled in + assert_eq!( + fragment.inner.actors.len(), + 1, + ); + } else { + // No shuffle, so the fragment of upstream materialized node is the same + // as stream table scan. + assert_eq!( + chain_fragment.inner.actors.len(), + fragment.inner.actors.len() + ); + } session .run(&format!( From 2be5453bb7152ea2f4cebe8683a0fd02f0a1660a Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 15 Jan 2024 21:22:49 +0800 Subject: [PATCH 06/10] refactor set arrangement backfill --- src/tests/simulation/src/cluster.rs | 8 +++++--- .../integration_tests/scale/cascade_materialized_view.rs | 3 +-- .../simulation/tests/integration_tests/scale/plan.rs | 1 + 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index f78548044a6ad..74e5eea14d7c9 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -128,8 +128,6 @@ impl Configuration { meta_nodes: 3, compactor_nodes: 2, compute_node_cores: 2, - per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()] - .into(), ..Default::default() } } @@ -171,7 +169,6 @@ metrics_level = "Disabled" compactor_nodes: 1, compute_node_cores: 2, per_session_queries: vec![ - "SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into(), "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(), "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(), ] @@ -823,6 +820,11 @@ impl Session { self.run("FLUSH").await?; Ok(()) } + + pub async fn is_arrangement_backfill_enabled(&mut self) -> Result { + let result = self.run("show streaming_enable_arrangement_backfill").await?; + Ok(result == "true") + } } /// Options for killing nodes. diff --git a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs index 3c96f4c35766c..29df5fa001a72 100644 --- a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs +++ b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs @@ -32,8 +32,7 @@ const MV5: &str = "create materialized view m5 as select * from m4;"; async fn test_simple_cascade_materialized_view() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); - let arrangement_backfill_is_enabled = session.run("show streaming_enable_arrangement_backfill").await?; - let arrangement_backfill_is_enabled = arrangement_backfill_is_enabled == "true"; + let arrangement_backfill_is_enabled = session.is_arrangement_backfill_enabled().await?; session.run(ROOT_TABLE_CREATE).await?; session.run(MV1).await?; diff --git a/src/tests/simulation/tests/integration_tests/scale/plan.rs b/src/tests/simulation/tests/integration_tests/scale/plan.rs index 24b294ad39c6f..b07c4e39a47e9 100644 --- a/src/tests/simulation/tests/integration_tests/scale/plan.rs +++ b/src/tests/simulation/tests/integration_tests/scale/plan.rs @@ -177,6 +177,7 @@ async fn test_resize_single() -> Result<()> { async fn test_resize_single_failed() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); + session.run("SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false").await?; session.run("create table t (v int);").await?; session From 26a29de5da81b97505600b07a1157f46426f2185 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 15 Jan 2024 22:12:32 +0800 Subject: [PATCH 07/10] disable tests selectively --- src/tests/simulation/src/cluster.rs | 22 +++++++++++++++++++ .../scale/auto_parallelism.rs | 1 + .../integration_tests/scale/no_shuffle.rs | 6 ++--- .../tests/integration_tests/scale/plan.rs | 5 ++--- .../tests/integration_tests/scale/table.rs | 2 +- 5 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 74e5eea14d7c9..7c9889068802c 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -131,6 +131,28 @@ impl Configuration { ..Default::default() } } + pub fn for_scale_no_shuffle() -> Self { + // Embed the config file and create a temporary file at runtime. The file will be deleted + // automatically when it's dropped. + let config_path = { + let mut file = + tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all(include_bytes!("risingwave-scale.toml")) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 2, + compute_nodes: 3, + meta_nodes: 3, + compactor_nodes: 2, + compute_node_cores: 2, + per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = false;".into()].into(), + ..Default::default() + } + } pub fn for_auto_parallelism( max_heartbeat_interval_secs: u64, diff --git a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs index 67b0283641b2f..5f1154addeb5f 100644 --- a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs @@ -493,6 +493,7 @@ async fn test_compatibility_with_low_level() -> Result<()> { ); let mut cluster = Cluster::start(config.clone()).await?; let mut session = cluster.start_session(); + session.run("SET streaming_enable_arrangement_backfill = false;").await?; // Keep one worker reserved for adding later. let select_worker = "compute-2"; diff --git a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs index c269be2c0fa34..a409443371ba9 100644 --- a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs +++ b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs @@ -20,7 +20,7 @@ use risingwave_simulation::utils::AssertResult; #[tokio::test] async fn test_delta_join() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); session.run("set rw_implicit_flush = true;").await?; @@ -125,7 +125,7 @@ async fn test_delta_join() -> Result<()> { #[tokio::test] async fn test_share_multiple_no_shuffle_upstream() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); session.run("create table t (a int, b int);").await?; @@ -145,7 +145,7 @@ async fn test_share_multiple_no_shuffle_upstream() -> Result<()> { #[tokio::test] async fn test_resolve_no_shuffle_upstream() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); session.run("create table t (v int);").await?; diff --git a/src/tests/simulation/tests/integration_tests/scale/plan.rs b/src/tests/simulation/tests/integration_tests/scale/plan.rs index b07c4e39a47e9..e1d8148632c62 100644 --- a/src/tests/simulation/tests/integration_tests/scale/plan.rs +++ b/src/tests/simulation/tests/integration_tests/scale/plan.rs @@ -175,9 +175,8 @@ async fn test_resize_single() -> Result<()> { #[tokio::test] async fn test_resize_single_failed() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); - session.run("SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false").await?; session.run("create table t (v int);").await?; session @@ -246,7 +245,7 @@ async fn test_resize_single_failed() -> Result<()> { } #[tokio::test] async fn test_resize_no_shuffle() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); session.run("create table t (v int);").await?; diff --git a/src/tests/simulation/tests/integration_tests/scale/table.rs b/src/tests/simulation/tests/integration_tests/scale/table.rs index 04293c1701bc1..6bceb5c07b536 100644 --- a/src/tests/simulation/tests/integration_tests/scale/table.rs +++ b/src/tests/simulation/tests/integration_tests/scale/table.rs @@ -88,7 +88,7 @@ async fn test_mv_on_scaled_table() -> Result<()> { #[tokio::test] async fn test_scale_on_schema_change() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; cluster.run(ROOT_TABLE_CREATE).await?; cluster.run(MV1).await?; From 03ba6bfc94476608612457dd15f9080de608ce81 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 15 Jan 2024 22:15:03 +0800 Subject: [PATCH 08/10] fmt --- src/tests/simulation/src/cluster.rs | 8 ++++++-- .../integration_tests/scale/auto_parallelism.rs | 13 ++++++++----- .../scale/cascade_materialized_view.rs | 10 ++-------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 7c9889068802c..e1b85a6f52105 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -131,6 +131,7 @@ impl Configuration { ..Default::default() } } + pub fn for_scale_no_shuffle() -> Self { // Embed the config file and create a temporary file at runtime. The file will be deleted // automatically when it's dropped. @@ -149,7 +150,8 @@ impl Configuration { meta_nodes: 3, compactor_nodes: 2, compute_node_cores: 2, - per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = false;".into()].into(), + per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = false;".into()] + .into(), ..Default::default() } } @@ -844,7 +846,9 @@ impl Session { } pub async fn is_arrangement_backfill_enabled(&mut self) -> Result { - let result = self.run("show streaming_enable_arrangement_backfill").await?; + let result = self + .run("show streaming_enable_arrangement_backfill") + .await?; Ok(result == "true") } } diff --git a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs index 5f1154addeb5f..c042451f286fe 100644 --- a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs @@ -493,7 +493,9 @@ async fn test_compatibility_with_low_level() -> Result<()> { ); let mut cluster = Cluster::start(config.clone()).await?; let mut session = cluster.start_session(); - session.run("SET streaming_enable_arrangement_backfill = false;").await?; + session + .run("SET streaming_enable_arrangement_backfill = false;") + .await?; // Keep one worker reserved for adding later. let select_worker = "compute-2"; @@ -643,12 +645,14 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result< sleep(Duration::from_secs( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2, )) - .await; + .await; session.run("create table t(v int);").await?; // Streaming arrangement backfill - session.run("SET streaming_enable_arrangement_backfill = true;").await?; + session + .run("SET streaming_enable_arrangement_backfill = true;") + .await?; session .run("create materialized view m_simple as select * from t;") .await?; @@ -725,7 +729,7 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result< sleep(Duration::from_secs( MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2, )) - .await; + .await; let after_fragment_parallelism = session .run("select fragment_id, parallelism from rw_fragments order by fragment_id;") @@ -735,4 +739,3 @@ async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result< Ok(()) } - diff --git a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs index 29df5fa001a72..981f79103403d 100644 --- a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs +++ b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs @@ -60,15 +60,9 @@ async fn test_simple_cascade_materialized_view() -> Result<()> { if arrangement_backfill_is_enabled { // The chain fragment is in a different table fragment. - assert_eq!( - chain_fragment.inner.actors.len(), - 6, - ); + assert_eq!(chain_fragment.inner.actors.len(), 6,); // The upstream materialized fragment should be scaled in - assert_eq!( - fragment.inner.actors.len(), - 1, - ); + assert_eq!(fragment.inner.actors.len(), 1,); } else { // No shuffle, so the fragment of upstream materialized node is the same // as stream table scan. From 4824f54c4b95c5cd74003e75b4fac07c5c38f24a Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 16 Jan 2024 12:39:05 +0800 Subject: [PATCH 09/10] revert to default --- src/common/src/session_config/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 3d7af60be6ec5..daef5faf1e240 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -135,7 +135,7 @@ pub struct ConfigMap { streaming_enable_bushy_join: bool, /// Enable arrangement backfill for streaming queries. Defaults to false. - #[parameter(default = true)] + #[parameter(default = false)] streaming_enable_arrangement_backfill: bool, /// Allow `jsonb` in stream key From 9e8400d866f9d1faa3bcaf3c09596126fb93d517 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 16 Jan 2024 15:21:34 +0800 Subject: [PATCH 10/10] doc --- src/tests/simulation/src/cluster.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index e1b85a6f52105..d7b45cd2a6b1d 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -109,7 +109,7 @@ impl Default for Configuration { } impl Configuration { - /// Returns the config for scale test. + /// Returns the configuration for scale test. pub fn for_scale() -> Self { // Embed the config file and create a temporary file at runtime. The file will be deleted // automatically when it's dropped. @@ -132,6 +132,8 @@ impl Configuration { } } + /// Provides a configuration for scale test which ensures that the arrangement backfill is disabled, + /// so table scan will use `no_shuffle`. pub fn for_scale_no_shuffle() -> Self { // Embed the config file and create a temporary file at runtime. The file will be deleted // automatically when it's dropped.