diff --git a/src/tests/simulation/tests/integration_tests/scale/table.rs b/src/tests/simulation/tests/integration_tests/scale/table.rs index d493f1ef7c2fb..d6e20b92cf8be 100644 --- a/src/tests/simulation/tests/integration_tests/scale/table.rs +++ b/src/tests/simulation/tests/integration_tests/scale/table.rs @@ -17,7 +17,7 @@ use std::iter::repeat_with; use anyhow::Result; use itertools::Itertools; use risingwave_simulation::cluster::{Cluster, Configuration}; -use risingwave_simulation::ctl_ext::predicate::identity_contains; +use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains}; const ROOT_TABLE_CREATE: &str = "create table t (v1 int);"; const MV1: &str = "create materialized view m1 as select * from t;"; @@ -85,3 +85,39 @@ async fn test_mv_on_scaled_table() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_scale_on_schema_change() -> Result<()> { + let mut cluster = Cluster::start(Configuration::for_scale()).await?; + cluster.run(ROOT_TABLE_CREATE).await?; + + cluster.run(MV1).await?; + + let fragment = cluster + .locate_one_fragment([identity_contains("materialize"), identity_contains("union")]) + .await?; + + cluster + .reschedule(fragment.reschedule([0, 2, 4], [])) + .await?; + + insert_and_flush!(cluster); + + cluster.run("alter table t add column v2 int").await?; + + let fragment = cluster + .locate_one_fragment([identity_contains("materialize"), identity_contains("StreamTableScan")]) + .await?; + + cluster.reschedule_resolve_no_shuffle(fragment.reschedule([1], [0, 4])).await?; + + let fragment = cluster + .locate_one_fragment([identity_contains("materialize"), identity_contains("union")]) + .await?; + let (_, used) = fragment.parallel_unit_usage(); + assert_eq!(used.len(), 4); + + insert_and_flush!(cluster); + + Ok(()) +}