Skip to content

Commit

Permalink
revert test_background_mv_barrier_recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Oct 31, 2023
1 parent f427a27 commit f60cef1
Showing 1 changed file with 31 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Duration;
use anyhow::Result;
use itertools::Itertools;
use risingwave_simulation::cluster::{Cluster, Configuration, KillOpts, Session};
use risingwave_simulation::utils::AssertResult;
use tokio::time::sleep;

const CREATE_TABLE: &str = "CREATE TABLE t(v1 int);";
Expand Down Expand Up @@ -81,54 +82,62 @@ fn init_logger() {

#[tokio::test]
async fn test_background_mv_barrier_recovery() -> Result<()> {
init_logger();
let mut cluster = Cluster::start(Configuration::for_background_ddl()).await?;
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();

session.run(CREATE_TABLE).await?;
session.run(SEED_TABLE).await?;
session.flush().await?;
session.run(SET_BACKGROUND_DDL).await?;
session.run(SET_RATE_LIMIT_2).await?;
session.run(CREATE_MV1).await?;
session.run("CREATE TABLE t1 (v1 int);").await?;
session
.run("INSERT INTO t1 select * from generate_series(1, 400000)")
.await?;
session.run("flush").await?;
session.run("SET BACKGROUND_DDL=true;").await?;
session
.run("create materialized view m1 as select * from t1;")
.await?;

// If the CN is killed before first barrier pass for the MV, the MV will be dropped.
// This is because it's table fragments will NOT be committed until first barrier pass.
sleep(Duration::from_secs(3)).await;
sleep(Duration::from_secs(5)).await;
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
session.run(SEED_TABLE).await?;
session.flush().await?;
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;

kill_cn_and_wait_recover(&cluster).await;
kill_and_wait_recover(&cluster).await;

// Send some upstream updates.
session.run(SEED_TABLE).await?;
session.flush().await?;
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;

kill_and_wait_recover(&cluster).await;
kill_cn_and_wait_recover(&cluster).await;

// Send some upstream updates.
session.run(SEED_TABLE).await?;
session.flush().await?;
cluster
.run("INSERT INTO t1 select * from generate_series(1, 100000);")
.await?;
cluster.run("flush;").await?;

// Now just wait for it to complete.
session.run(WAIT).await?;

let t_inserts = session.run("SELECT COUNT(v1) FROM t").await?;

let mv1_inserts = session.run("SELECT COUNT(v1) FROM mv1").await?;
sleep(Duration::from_secs(10)).await;

assert_eq!(t_inserts, mv1_inserts);
session
.run("SELECT COUNT(v1) FROM m1")
.await?
.assert_result_eq("700000");

// Make sure that if MV killed and restarted
// it will not be dropped.

session.run("DROP MATERIALIZED VIEW mv1").await?;
session.run("DROP TABLE t").await?;
session.run("DROP MATERIALIZED VIEW m1").await?;
session.run("DROP TABLE t1").await?;

Ok(())
}
Expand Down

0 comments on commit f60cef1

Please sign in to comment.