diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index f31eb1ec8609c..b4849c877f8ec 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -17,6 +17,7 @@ use std::time::Duration; use anyhow::Result; use itertools::Itertools; use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts, Session}; +use risingwave_simulation::utils::AssertResult; use tokio::time::sleep; const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);"; @@ -81,54 +82,62 @@ fn init_logger() { #[tokio::test] async fn test_background_mv_barrier_recovery() -> Result<()> { - init_logger(); - let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?; + let mut cluster = Cluster::start(Configuration::for_scale()).await?; let mut session = cluster.start_session(); - session.run(CREATE_TABLE).await?; - session.run(SEED_TABLE).await?; - session.flush().await?; - session.run(SET_BACKGROUND_DDL).await?; - session.run(SET_RATE_LIMIT_2).await?; - session.run(CREATE_MV1).await?; + session.run("CREATE TABLE t1 (v1 int);").await?; + session + .run("INSERT INTO t1 select * from generate_series(1, 400000)") + .await?; + session.run("flush").await?; + session.run("SET BACKGROUND_DDL=true;").await?; + session + .run("create materialized view m1 as select * from t1;") + .await?; // If the CN is killed before first barrier pass for the MV, the MV will be dropped. // This is because it's table fragments will NOT be committed until first barrier pass. - sleep(Duration::from_secs(3)).await; + sleep(Duration::from_secs(5)).await; kill_cn_and_wait_recover(&cluster).await; // Send some upstream updates. - session.run(SEED_TABLE).await?; - session.flush().await?; + cluster + .run("INSERT INTO t1 select * from generate_series(1, 100000);") + .await?; + cluster.run("flush;").await?; kill_cn_and_wait_recover(&cluster).await; kill_and_wait_recover(&cluster).await; // Send some upstream updates. - session.run(SEED_TABLE).await?; - session.flush().await?; + cluster + .run("INSERT INTO t1 select * from generate_series(1, 100000);") + .await?; + cluster.run("flush;").await?; kill_and_wait_recover(&cluster).await; kill_cn_and_wait_recover(&cluster).await; // Send some upstream updates. - session.run(SEED_TABLE).await?; - session.flush().await?; + cluster + .run("INSERT INTO t1 select * from generate_series(1, 100000);") + .await?; + cluster.run("flush;").await?; // Now just wait for it to complete. - session.run(WAIT).await?; - let t_inserts = session.run("SELECT COUNT(v1) FROM t").await?; - - let mv1_inserts = session.run("SELECT COUNT(v1) FROM mv1").await?; + sleep(Duration::from_secs(10)).await; - assert_eq!(t_inserts, mv1_inserts); + session + .run("SELECT COUNT(v1) FROM m1") + .await? + .assert_result_eq("700000"); // Make sure that if MV killed and restarted // it will not be dropped. - session.run("DROP MATERIALIZED VIEW mv1").await?; - session.run("DROP TABLE t").await?; + session.run("DROP MATERIALIZED VIEW m1").await?; + session.run("DROP TABLE t1").await?; Ok(()) }