Skip to content

Commit

Permalink
add intgration test
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 committed Jan 8, 2024
1 parent 4601cd9 commit 5718950
Showing 1 changed file with 37 additions and 1 deletion.
38 changes: 37 additions & 1 deletion src/tests/simulation/tests/integration_tests/scale/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::iter::repeat_with;
use anyhow::Result;
use itertools::Itertools;
use risingwave_simulation::cluster::{Cluster, Configuration};
use risingwave_simulation::ctl_ext::predicate::identity_contains;
use risingwave_simulation::ctl_ext::predicate::{identity_contains, no_identity_contains};

const ROOT_TABLE_CREATE: &str = "create table t (v1 int);";
const MV1: &str = "create materialized view m1 as select * from t;";
Expand Down Expand Up @@ -85,3 +85,39 @@ async fn test_mv_on_scaled_table() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_scale_on_schema_change() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
cluster.run(ROOT_TABLE_CREATE).await?;

cluster.run(MV1).await?;

let fragment = cluster
.locate_one_fragment([identity_contains("materialize"), identity_contains("union")])
.await?;

cluster
.reschedule(fragment.reschedule([0, 2, 4], []))
.await?;

insert_and_flush!(cluster);

cluster.run("alter table t add column v2 int").await?;

let fragment = cluster
.locate_one_fragment([identity_contains("materialize"), identity_contains("StreamTableScan")])
.await?;

cluster.reschedule_resolve_no_shuffle(fragment.reschedule([1], [0, 4])).await?;

let fragment = cluster
.locate_one_fragment([identity_contains("materialize"), identity_contains("union")])
.await?;
let (_, used) = fragment.parallel_unit_usage();
assert_eq!(used.len(), 4);

insert_and_flush!(cluster);

Ok(())
}

0 comments on commit 5718950

Please sign in to comment.