diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 139581d46f43f..83c93eb807036 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -41,10 +41,20 @@ use crate::stream::{build_actor_connector_splits, build_actor_split_impls, Split /// Column family name for table fragments. const TABLE_FRAGMENTS_CF_NAME: &str = "cf/table_fragments"; +/// The parallelism for a `TableFragments`. #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum TableParallelism { + /// This is when the system decides the parallelism, based on the available parallel units. Auto, + /// We set this when the `TableFragments` parallelism is changed. + /// All fragments which are part of the `TableFragment` will have the same parallelism as this. Fixed(usize), + /// We set this when the individual parallelisms of the `Fragments` + /// can differ within a `TableFragments`. + /// This is set for `risectl`, since it has a low-level interface, + /// scale individual `Fragments` within `TableFragments`. + /// When that happens, the `TableFragments` no longer has a consistent + /// parallelism, so we set this to indicate that. Custom, } diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index 9b3e1a7ad7369..d7b45cd2a6b1d 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -109,7 +109,7 @@ impl Default for Configuration { } impl Configuration { - /// Returns the config for scale test. + /// Returns the configuration for scale test. pub fn for_scale() -> Self { // Embed the config file and create a temporary file at runtime. The file will be deleted // automatically when it's dropped. @@ -128,7 +128,31 @@ impl Configuration { meta_nodes: 3, compactor_nodes: 2, compute_node_cores: 2, - per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()] + ..Default::default() + } + } + + /// Provides a configuration for scale test which ensures that the arrangement backfill is disabled, + /// so table scan will use `no_shuffle`. + pub fn for_scale_no_shuffle() -> Self { + // Embed the config file and create a temporary file at runtime. The file will be deleted + // automatically when it's dropped. + let config_path = { + let mut file = + tempfile::NamedTempFile::new().expect("failed to create temp config file"); + file.write_all(include_bytes!("risingwave-scale.toml")) + .expect("failed to write config file"); + file.into_temp_path() + }; + + Configuration { + config_path: ConfigPath::Temp(config_path.into()), + frontend_nodes: 2, + compute_nodes: 3, + meta_nodes: 3, + compactor_nodes: 2, + compute_node_cores: 2, + per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL = false;".into()] .into(), ..Default::default() } @@ -170,7 +194,10 @@ metrics_level = "Disabled" meta_nodes: 1, compactor_nodes: 1, compute_node_cores: 2, - per_session_queries: vec!["SET STREAMING_ENABLE_ARRANGEMENT_BACKFILL=false".into()] + per_session_queries: vec![ + "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(), + "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(), + ] .into(), ..Default::default() } @@ -819,6 +846,13 @@ impl Session { self.run("FLUSH").await?; Ok(()) } + + pub async fn is_arrangement_backfill_enabled(&mut self) -> Result { + let result = self + .run("show streaming_enable_arrangement_backfill") + .await?; + Ok(result == "true") + } } /// Options for killing nodes. diff --git a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs index 242d699c8259d..c042451f286fe 100644 --- a/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs +++ b/src/tests/simulation/tests/integration_tests/scale/auto_parallelism.rs @@ -493,6 +493,9 @@ async fn test_compatibility_with_low_level() -> Result<()> { ); let mut cluster = Cluster::start(config.clone()).await?; let mut session = cluster.start_session(); + session + .run("SET streaming_enable_arrangement_backfill = false;") + .await?; // Keep one worker reserved for adding later. let select_worker = "compute-2"; @@ -505,10 +508,6 @@ async fn test_compatibility_with_low_level() -> Result<()> { )) .await; - // helper views - session.run("create view table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;").await?; - session.run("create view mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;").await?; - session.run("create table t(v int);").await?; // single fragment downstream @@ -593,7 +592,7 @@ async fn test_compatibility_with_low_level() -> Result<()> { let hash_join_fragment_id = hash_join_fragment.id(); - // manual scale in m_simple materialize fragment + // manual scale in m_join materialize fragment cluster .reschedule_resolve_no_shuffle(format!( "{hash_join_fragment_id}-[{chosen_parallel_unit_a}]" @@ -626,3 +625,117 @@ async fn test_compatibility_with_low_level() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_compatibility_with_low_level_and_arrangement_backfill() -> Result<()> { + let config = Configuration::for_auto_parallelism( + MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE, + false, + true, + ); + let mut cluster = Cluster::start(config.clone()).await?; + let mut session = cluster.start_session(); + + // Keep one worker reserved for adding later. + let select_worker = "compute-2"; + cluster + .simple_kill_nodes(vec![select_worker.to_string()]) + .await; + + sleep(Duration::from_secs( + MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2, + )) + .await; + + session.run("create table t(v int);").await?; + + // Streaming arrangement backfill + session + .run("SET streaming_enable_arrangement_backfill = true;") + .await?; + session + .run("create materialized view m_simple as select * from t;") + .await?; + + session + .run("select parallelism from table_parallelism") + .await? + .assert_result_eq("AUTO"); + + // Find the table materialize fragment + let table_mat_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("materialize"), + identity_contains("union"), + ]) + .await?; + + let (mut all_parallel_units, _) = table_mat_fragment.parallel_unit_usage(); + + let chosen_parallel_unit_a = all_parallel_units.pop().unwrap(); + let chosen_parallel_unit_b = all_parallel_units.pop().unwrap(); + + let table_mat_fragment_id = table_mat_fragment.id(); + + // manual scale in table materialize fragment + cluster + .reschedule(format!( + "{table_mat_fragment_id}-[{chosen_parallel_unit_a}]", + )) + .await?; + + session + .run("select parallelism from table_parallelism") + .await? + .assert_result_eq("CUSTOM"); + + // Upstream changes should not affect downstream. + session + .run("select parallelism from mview_parallelism where name = 'm_simple'") + .await? + .assert_result_eq("AUTO"); + + // Find the table fragment for materialized view + let simple_mv_fragment = cluster + .locate_one_fragment(vec![ + identity_contains("materialize"), + identity_contains("StreamTableScan"), + ]) + .await?; + + let simple_mv_fragment_id = simple_mv_fragment.id(); + + // manual scale in m_simple materialize fragment + cluster + .reschedule_resolve_no_shuffle(format!( + "{simple_mv_fragment_id}-[{chosen_parallel_unit_b}]", + )) + .await?; + + // The downstream table fragment should be separate from the upstream table fragment. + session + .run("select parallelism from mview_parallelism where name = 'm_simple'") + .await? + .assert_result_eq("CUSTOM"); + + let before_fragment_parallelism = session + .run("select fragment_id, parallelism from rw_fragments order by fragment_id;") + .await?; + + cluster + .simple_restart_nodes(vec![select_worker.to_string()]) + .await; + + sleep(Duration::from_secs( + MAX_HEARTBEAT_INTERVAL_SECS_CONFIG_FOR_AUTO_SCALE * 2, + )) + .await; + + let after_fragment_parallelism = session + .run("select fragment_id, parallelism from rw_fragments order by fragment_id;") + .await?; + + assert_eq!(before_fragment_parallelism, after_fragment_parallelism); + + Ok(()) +} diff --git a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs index 6e1c1333007d3..981f79103403d 100644 --- a/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs +++ b/src/tests/simulation/tests/integration_tests/scale/cascade_materialized_view.rs @@ -32,6 +32,7 @@ const MV5: &str = "create materialized view m5 as select * from m4;"; async fn test_simple_cascade_materialized_view() -> Result<()> { let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); + let arrangement_backfill_is_enabled = session.is_arrangement_backfill_enabled().await?; session.run(ROOT_TABLE_CREATE).await?; session.run(MV1).await?; @@ -57,10 +58,19 @@ async fn test_simple_cascade_materialized_view() -> Result<()> { .locate_one_fragment([identity_contains("StreamTableScan")]) .await?; - assert_eq!( - chain_fragment.inner.actors.len(), - fragment.inner.actors.len() - ); + if arrangement_backfill_is_enabled { + // The chain fragment is in a different table fragment. + assert_eq!(chain_fragment.inner.actors.len(), 6,); + // The upstream materialized fragment should be scaled in + assert_eq!(fragment.inner.actors.len(), 1,); + } else { + // No shuffle, so the fragment of upstream materialized node is the same + // as stream table scan. + assert_eq!( + chain_fragment.inner.actors.len(), + fragment.inner.actors.len() + ); + } session .run(&format!( diff --git a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs index c269be2c0fa34..a409443371ba9 100644 --- a/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs +++ b/src/tests/simulation/tests/integration_tests/scale/no_shuffle.rs @@ -20,7 +20,7 @@ use risingwave_simulation::utils::AssertResult; #[tokio::test] async fn test_delta_join() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); session.run("set rw_implicit_flush = true;").await?; @@ -125,7 +125,7 @@ async fn test_delta_join() -> Result<()> { #[tokio::test] async fn test_share_multiple_no_shuffle_upstream() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); session.run("create table t (a int, b int);").await?; @@ -145,7 +145,7 @@ async fn test_share_multiple_no_shuffle_upstream() -> Result<()> { #[tokio::test] async fn test_resolve_no_shuffle_upstream() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); session.run("create table t (v int);").await?; diff --git a/src/tests/simulation/tests/integration_tests/scale/plan.rs b/src/tests/simulation/tests/integration_tests/scale/plan.rs index 24b294ad39c6f..e1d8148632c62 100644 --- a/src/tests/simulation/tests/integration_tests/scale/plan.rs +++ b/src/tests/simulation/tests/integration_tests/scale/plan.rs @@ -175,7 +175,7 @@ async fn test_resize_single() -> Result<()> { #[tokio::test] async fn test_resize_single_failed() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); session.run("create table t (v int);").await?; @@ -245,7 +245,7 @@ async fn test_resize_single_failed() -> Result<()> { } #[tokio::test] async fn test_resize_no_shuffle() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; let mut session = cluster.start_session(); session.run("create table t (v int);").await?; diff --git a/src/tests/simulation/tests/integration_tests/scale/table.rs b/src/tests/simulation/tests/integration_tests/scale/table.rs index 04293c1701bc1..6bceb5c07b536 100644 --- a/src/tests/simulation/tests/integration_tests/scale/table.rs +++ b/src/tests/simulation/tests/integration_tests/scale/table.rs @@ -88,7 +88,7 @@ async fn test_mv_on_scaled_table() -> Result<()> { #[tokio::test] async fn test_scale_on_schema_change() -> Result<()> { - let mut cluster = Cluster::start(Configuration::for_scale()).await?; + let mut cluster = Cluster::start(Configuration::for_scale_no_shuffle()).await?; cluster.run(ROOT_TABLE_CREATE).await?; cluster.run(MV1).await?;